Prefill stream change caches

This commit is contained in:
Erik Johnston 2016-01-29 14:37:59 +00:00
parent ebc5f00efe
commit 18579534ea
4 changed files with 52 additions and 19 deletions

View File

@ -45,9 +45,10 @@ from .search import SearchStore
from .tags import TagsStore from .tags import TagsStore
from .account_data import AccountDataStore from .account_data import AccountDataStore
from util.id_generators import IdGenerator, StreamIdGenerator from util.id_generators import IdGenerator, StreamIdGenerator
from synapse.util.caches.stream_change_cache import StreamChangeCache
import logging import logging
@ -117,8 +118,54 @@ class DataStore(RoomMemberStore, RoomStore,
self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
events_max = self._stream_id_gen.get_max_token(None)
event_cache_prefill = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", events_max,
prefilled_cache=event_cache_prefill,
)
account_max = self._account_data_id_gen.get_max_token(None)
account_cache_prefill = self._get_cache_dict(
db_conn, "account_data",
entity_column="user_id",
stream_column="stream_id",
max_value=account_max,
)
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max,
prefilled_cache=account_cache_prefill,
)
super(DataStore, self).__init__(hs) super(DataStore, self).__init__(hs)
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
sql = (
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
" WHERE %(stream)s > max(? - 100000, 0)"
" GROUP BY %(entity)s"
" ORDER BY MAX(%(stream)s) DESC"
" LIMIT 10000"
) % {
"table": table,
"entity": entity_column,
"stream": stream_column,
}
txn = db_conn.cursor()
txn.execute(sql, (int(max_value),))
rows = txn.fetchall()
return {
row[0]: row[1]
for row in rows
}
@defer.inlineCallbacks @defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent): def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec()) now = int(self._clock.time_msec())

View File

@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer from twisted.internet import defer
import ujson as json import ujson as json
@ -24,14 +23,6 @@ logger = logging.getLogger(__name__)
class AccountDataStore(SQLBaseStore): class AccountDataStore(SQLBaseStore):
def __init__(self, hs):
super(AccountDataStore, self).__init__(hs)
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache",
self._account_data_id_gen.get_max_token(None),
max_size=10000,
)
def get_account_data_for_user(self, user_id): def get_account_data_for_user(self, user_id):
"""Get all the client account_data for a user. """Get all the client account_data for a user.

View File

@ -37,7 +37,6 @@ from twisted.internet import defer
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
@ -78,13 +77,6 @@ def upper_bound(token):
class StreamStore(SQLBaseStore): class StreamStore(SQLBaseStore):
def __init__(self, hs):
super(StreamStore, self).__init__(hs)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0): def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the # NB this lives here instead of appservice.py so we can reuse the

View File

@ -32,7 +32,7 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities. old then the cache will simply return all given entities.
""" """
def __init__(self, name, current_stream_pos, max_size=10000): def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
self._max_size = max_size self._max_size = max_size
self._entity_to_key = {} self._entity_to_key = {}
self._cache = sorteddict() self._cache = sorteddict()
@ -40,6 +40,9 @@ class StreamChangeCache(object):
self.name = name self.name = name
caches_by_name[self.name] = self._cache caches_by_name[self.name] = self._cache
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos): def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos """Returns True if the entity may have been updated since stream_pos
""" """