mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-15 15:18:47 -05:00
Merge pull request #582 from matrix-org/erikj/presence
Rewrite presence for performance.
This commit is contained in:
commit
e5ad2e5267
30 changed files with 1572 additions and 3224 deletions
|
|
@ -20,7 +20,7 @@ from .appservice import (
|
|||
from ._base import Cache
|
||||
from .directory import DirectoryStore
|
||||
from .events import EventsStore
|
||||
from .presence import PresenceStore
|
||||
from .presence import PresenceStore, UserPresenceState
|
||||
from .profile import ProfileStore
|
||||
from .registration import RegistrationStore
|
||||
from .room import RoomStore
|
||||
|
|
@ -47,6 +47,7 @@ from .account_data import AccountDataStore
|
|||
|
||||
from util.id_generators import IdGenerator, StreamIdGenerator
|
||||
|
||||
from synapse.api.constants import PresenceState
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
||||
|
|
@ -110,6 +111,9 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._account_data_id_gen = StreamIdGenerator(
|
||||
db_conn, "account_data_max_stream_id", "stream_id"
|
||||
)
|
||||
self._presence_id_gen = StreamIdGenerator(
|
||||
db_conn, "presence_stream", "stream_id"
|
||||
)
|
||||
|
||||
self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
|
||||
self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
|
||||
|
|
@ -119,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
|
||||
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
|
||||
|
||||
events_max = self._stream_id_gen.get_max_token(None)
|
||||
events_max = self._stream_id_gen.get_max_token()
|
||||
event_cache_prefill, min_event_val = self._get_cache_dict(
|
||||
db_conn, "events",
|
||||
entity_column="room_id",
|
||||
|
|
@ -135,13 +139,31 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
"MembershipStreamChangeCache", events_max,
|
||||
)
|
||||
|
||||
account_max = self._account_data_id_gen.get_max_token(None)
|
||||
account_max = self._account_data_id_gen.get_max_token()
|
||||
self._account_data_stream_cache = StreamChangeCache(
|
||||
"AccountDataAndTagsChangeCache", account_max,
|
||||
)
|
||||
|
||||
self.__presence_on_startup = self._get_active_presence(db_conn)
|
||||
|
||||
presence_cache_prefill, min_presence_val = self._get_cache_dict(
|
||||
db_conn, "presence_stream",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self._presence_id_gen.get_max_token(),
|
||||
)
|
||||
self.presence_stream_cache = StreamChangeCache(
|
||||
"PresenceStreamChangeCache", min_presence_val,
|
||||
prefilled_cache=presence_cache_prefill
|
||||
)
|
||||
|
||||
super(DataStore, self).__init__(hs)
|
||||
|
||||
def take_presence_startup_info(self):
|
||||
active_on_startup = self.__presence_on_startup
|
||||
self.__presence_on_startup = None
|
||||
return active_on_startup
|
||||
|
||||
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
|
||||
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
|
||||
# It doesn't really matter how many we get, the StreamChangeCache will
|
||||
|
|
@ -161,6 +183,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
txn = db_conn.cursor()
|
||||
txn.execute(sql, (int(max_value),))
|
||||
rows = txn.fetchall()
|
||||
txn.close()
|
||||
|
||||
cache = {
|
||||
row[0]: int(row[1])
|
||||
|
|
@ -174,6 +197,28 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
return cache, min_val
|
||||
|
||||
def _get_active_presence(self, db_conn):
|
||||
"""Fetch non-offline presence from the database so that we can register
|
||||
the appropriate time outs.
|
||||
"""
|
||||
|
||||
sql = (
|
||||
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
|
||||
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
|
||||
" WHERE state != ?"
|
||||
)
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, (PresenceState.OFFLINE,))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
txn.close()
|
||||
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(self, user, access_token, ip, user_agent):
|
||||
now = int(self._clock.time_msec())
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ class AccountDataStore(SQLBaseStore):
|
|||
"add_room_account_data", add_account_data_txn, next_id
|
||||
)
|
||||
|
||||
result = yield self._account_data_id_gen.get_max_token(self)
|
||||
result = yield self._account_data_id_gen.get_max_token()
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -207,7 +207,7 @@ class AccountDataStore(SQLBaseStore):
|
|||
"add_user_account_data", add_account_data_txn, next_id
|
||||
)
|
||||
|
||||
result = yield self._account_data_id_gen.get_max_token(self)
|
||||
result = yield self._account_data_id_gen.get_max_token()
|
||||
defer.returnValue(result)
|
||||
|
||||
def _update_max_stream_id(self, txn, next_id):
|
||||
|
|
|
|||
|
|
@ -131,7 +131,7 @@ class EventsStore(SQLBaseStore):
|
|||
except _RollbackButIsFineException:
|
||||
pass
|
||||
|
||||
max_persisted_id = yield self._stream_id_gen.get_max_token(self)
|
||||
max_persisted_id = yield self._stream_id_gen.get_max_token()
|
||||
defer.returnValue((stream_ordering, max_persisted_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 29
|
||||
SCHEMA_VERSION = 30
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
|||
|
|
@ -14,73 +14,129 @@
|
|||
# limitations under the License.
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.api.constants import PresenceState
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
from collections import namedtuple
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
class UserPresenceState(namedtuple("UserPresenceState",
|
||||
("user_id", "state", "last_active_ts",
|
||||
"last_federation_update_ts", "last_user_sync_ts",
|
||||
"status_msg", "currently_active"))):
|
||||
"""Represents the current presence state of the user.
|
||||
|
||||
user_id (str)
|
||||
last_active (int): Time in msec that the user last interacted with server.
|
||||
last_federation_update (int): Time in msec since either a) we sent a presence
|
||||
update to other servers or b) we received a presence update, depending
|
||||
on if is a local user or not.
|
||||
last_user_sync (int): Time in msec that the user last *completed* a sync
|
||||
(or event stream).
|
||||
status_msg (str): User set status message.
|
||||
"""
|
||||
|
||||
def copy_and_replace(self, **kwargs):
|
||||
return self._replace(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def default(cls, user_id):
|
||||
"""Returns a default presence state.
|
||||
"""
|
||||
return cls(
|
||||
user_id=user_id,
|
||||
state=PresenceState.OFFLINE,
|
||||
last_active_ts=0,
|
||||
last_federation_update_ts=0,
|
||||
last_user_sync_ts=0,
|
||||
status_msg=None,
|
||||
currently_active=False,
|
||||
)
|
||||
|
||||
|
||||
class PresenceStore(SQLBaseStore):
|
||||
def create_presence(self, user_localpart):
|
||||
res = self._simple_insert(
|
||||
table="presence",
|
||||
values={"user_id": user_localpart},
|
||||
desc="create_presence",
|
||||
@defer.inlineCallbacks
|
||||
def update_presence(self, presence_states):
|
||||
stream_id_manager = yield self._presence_id_gen.get_next(self)
|
||||
with stream_id_manager as stream_id:
|
||||
yield self.runInteraction(
|
||||
"update_presence",
|
||||
self._update_presence_txn, stream_id, presence_states,
|
||||
)
|
||||
|
||||
defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
|
||||
|
||||
def _update_presence_txn(self, txn, stream_id, presence_states):
|
||||
for state in presence_states:
|
||||
txn.call_after(
|
||||
self.presence_stream_cache.entity_has_changed,
|
||||
state.user_id, stream_id,
|
||||
)
|
||||
|
||||
# Actually insert new rows
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="presence_stream",
|
||||
values=[
|
||||
{
|
||||
"stream_id": stream_id,
|
||||
"user_id": state.user_id,
|
||||
"state": state.state,
|
||||
"last_active_ts": state.last_active_ts,
|
||||
"last_federation_update_ts": state.last_federation_update_ts,
|
||||
"last_user_sync_ts": state.last_user_sync_ts,
|
||||
"status_msg": state.status_msg,
|
||||
"currently_active": state.currently_active,
|
||||
}
|
||||
for state in presence_states
|
||||
],
|
||||
)
|
||||
|
||||
self.get_presence_state.invalidate((user_localpart,))
|
||||
return res
|
||||
|
||||
def has_presence_state(self, user_localpart):
|
||||
return self._simple_select_one(
|
||||
table="presence",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
retcols=["user_id"],
|
||||
allow_none=True,
|
||||
desc="has_presence_state",
|
||||
# Delete old rows to stop database from getting really big
|
||||
sql = (
|
||||
"DELETE FROM presence_stream WHERE"
|
||||
" stream_id < ?"
|
||||
" AND user_id IN (%s)"
|
||||
)
|
||||
|
||||
@cached(max_entries=2000)
|
||||
def get_presence_state(self, user_localpart):
|
||||
return self._simple_select_one(
|
||||
table="presence",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
retcols=["state", "status_msg", "mtime"],
|
||||
desc="get_presence_state",
|
||||
batches = (
|
||||
presence_states[i:i + 50]
|
||||
for i in xrange(0, len(presence_states), 50)
|
||||
)
|
||||
|
||||
@cachedList(get_presence_state.cache, list_name="user_localparts",
|
||||
inlineCallbacks=True)
|
||||
def get_presence_states(self, user_localparts):
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="presence",
|
||||
column="user_id",
|
||||
iterable=user_localparts,
|
||||
retcols=("user_id", "state", "status_msg", "mtime",),
|
||||
desc="get_presence_states",
|
||||
)
|
||||
|
||||
defer.returnValue({
|
||||
row["user_id"]: {
|
||||
"state": row["state"],
|
||||
"status_msg": row["status_msg"],
|
||||
"mtime": row["mtime"],
|
||||
}
|
||||
for row in rows
|
||||
})
|
||||
for states in batches:
|
||||
args = [stream_id]
|
||||
args.extend(s.user_id for s in states)
|
||||
txn.execute(
|
||||
sql % (",".join("?" for _ in states),),
|
||||
args
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_presence_state(self, user_localpart, new_state):
|
||||
res = yield self._simple_update_one(
|
||||
table="presence",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
updatevalues={"state": new_state["state"],
|
||||
"status_msg": new_state["status_msg"],
|
||||
"mtime": self._clock.time_msec()},
|
||||
desc="set_presence_state",
|
||||
def get_presence_for_users(self, user_ids):
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="presence_stream",
|
||||
column="user_id",
|
||||
iterable=user_ids,
|
||||
keyvalues={},
|
||||
retcols=(
|
||||
"user_id",
|
||||
"state",
|
||||
"last_active_ts",
|
||||
"last_federation_update_ts",
|
||||
"last_user_sync_ts",
|
||||
"status_msg",
|
||||
"currently_active",
|
||||
),
|
||||
)
|
||||
|
||||
self.get_presence_state.invalidate((user_localpart,))
|
||||
defer.returnValue(res)
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
defer.returnValue([UserPresenceState(**row) for row in rows])
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_max_token()
|
||||
|
||||
def allow_presence_visible(self, observed_localpart, observer_userid):
|
||||
return self._simple_insert(
|
||||
|
|
@ -128,6 +184,7 @@ class PresenceStore(SQLBaseStore):
|
|||
desc="set_presence_list_accepted",
|
||||
)
|
||||
self.get_presence_list_accepted.invalidate((observer_localpart,))
|
||||
self.get_presence_list_observers_accepted.invalidate((observed_userid,))
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_presence_list(self, observer_localpart, accepted=None):
|
||||
|
|
@ -154,6 +211,19 @@ class PresenceStore(SQLBaseStore):
|
|||
desc="get_presence_list_accepted",
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
def get_presence_list_observers_accepted(self, observed_userid):
|
||||
user_localparts = yield self._simple_select_onecol(
|
||||
table="presence_list",
|
||||
keyvalues={"observed_user_id": observed_userid, "accepted": True},
|
||||
retcol="user_id",
|
||||
desc="get_presence_list_accepted",
|
||||
)
|
||||
|
||||
defer.returnValue([
|
||||
"@%s:%s" % (u, self.hs.hostname,) for u in user_localparts
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def del_presence_list(self, observer_localpart, observed_userid):
|
||||
yield self._simple_delete_one(
|
||||
|
|
@ -163,3 +233,4 @@ class PresenceStore(SQLBaseStore):
|
|||
desc="del_presence_list",
|
||||
)
|
||||
self.get_presence_list_accepted.invalidate((observer_localpart,))
|
||||
self.get_presence_list_observers_accepted.invalidate((observed_userid,))
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
|
|||
super(ReceiptsStore, self).__init__(hs)
|
||||
|
||||
self._receipts_stream_cache = StreamChangeCache(
|
||||
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
|
||||
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
|
||||
)
|
||||
|
||||
@cached(num_args=2)
|
||||
|
|
@ -222,7 +222,7 @@ class ReceiptsStore(SQLBaseStore):
|
|||
defer.returnValue(results)
|
||||
|
||||
def get_max_receipt_stream_id(self):
|
||||
return self._receipts_id_gen.get_max_token(self)
|
||||
return self._receipts_id_gen.get_max_token()
|
||||
|
||||
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
||||
user_id, event_id, data, stream_id):
|
||||
|
|
@ -347,7 +347,7 @@ class ReceiptsStore(SQLBaseStore):
|
|||
room_id, receipt_type, user_id, event_ids, data
|
||||
)
|
||||
|
||||
max_persisted_id = yield self._stream_id_gen.get_max_token(self)
|
||||
max_persisted_id = yield self._stream_id_gen.get_max_token()
|
||||
|
||||
defer.returnValue((stream_id, max_persisted_id))
|
||||
|
||||
|
|
|
|||
30
synapse/storage/schema/delta/30/presence_stream.sql
Normal file
30
synapse/storage/schema/delta/30/presence_stream.sql
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
CREATE TABLE presence_stream(
|
||||
stream_id BIGINT,
|
||||
user_id TEXT,
|
||||
state TEXT,
|
||||
last_active_ts BIGINT,
|
||||
last_federation_update_ts BIGINT,
|
||||
last_user_sync_ts BIGINT,
|
||||
status_msg TEXT,
|
||||
currently_active BOOLEAN
|
||||
);
|
||||
|
||||
CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id);
|
||||
CREATE INDEX presence_stream_user_id ON presence_stream(user_id);
|
||||
CREATE INDEX presence_stream_state ON presence_stream(state);
|
||||
|
|
@ -531,7 +531,7 @@ class StreamStore(SQLBaseStore):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_events_max_id(self, direction='f'):
|
||||
token = yield self._stream_id_gen.get_max_token(self)
|
||||
token = yield self._stream_id_gen.get_max_token()
|
||||
if direction != 'b':
|
||||
defer.returnValue("s%d" % (token,))
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
|
|||
Returns:
|
||||
A deferred int.
|
||||
"""
|
||||
return self._account_data_id_gen.get_max_token(self)
|
||||
return self._account_data_id_gen.get_max_token()
|
||||
|
||||
@cached()
|
||||
def get_tags_for_user(self, user_id):
|
||||
|
|
@ -147,7 +147,7 @@ class TagsStore(SQLBaseStore):
|
|||
|
||||
self.get_tags_for_user.invalidate((user_id,))
|
||||
|
||||
result = yield self._account_data_id_gen.get_max_token(self)
|
||||
result = yield self._account_data_id_gen.get_max_token()
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -169,7 +169,7 @@ class TagsStore(SQLBaseStore):
|
|||
|
||||
self.get_tags_for_user.invalidate((user_id,))
|
||||
|
||||
result = yield self._account_data_id_gen.get_max_token(self)
|
||||
result = yield self._account_data_id_gen.get_max_token()
|
||||
defer.returnValue(result)
|
||||
|
||||
def _update_revision_txn(self, txn, user_id, room_id, next_id):
|
||||
|
|
|
|||
|
|
@ -130,7 +130,7 @@ class StreamIdGenerator(object):
|
|||
|
||||
return manager()
|
||||
|
||||
def get_max_token(self, store):
|
||||
def get_max_token(self):
|
||||
"""Returns the maximum stream id such that all stream ids less than or
|
||||
equal to it have been successfully persisted.
|
||||
"""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue