Use dictionary cache to do group -> state fetching

This commit is contained in:
Erik Johnston 2015-08-05 15:06:51 +01:00
parent c67ba143fa
commit 07507643cb
7 changed files with 195 additions and 110 deletions

View file

@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
from synapse.util.lrucache import LruCache
from synapse.util.dictionary_cache import DictionaryCache
import synapse.metrics
from util.id_generators import IdGenerator, StreamIdGenerator
@ -87,23 +88,33 @@ class Cache(object):
)
def get(self, *keyargs):
if len(keyargs) != self.keylen:
raise ValueError("Expected a key to have %d items", self.keylen)
try:
if len(keyargs) != self.keylen:
raise ValueError("Expected a key to have %d items", self.keylen)
val = self.cache.get(keyargs, self.sentinel)
if val is not self.sentinel:
cache_counter.inc_hits(self.name)
return val
val = self.cache.get(keyargs, self.sentinel)
if val is not self.sentinel:
cache_counter.inc_hits(self.name)
return val
cache_counter.inc_misses(self.name)
raise KeyError()
cache_counter.inc_misses(self.name)
raise KeyError()
except KeyError:
raise
except:
logger.exception("Cache.get failed for %s" % (self.name,))
raise
def update(self, sequence, *args):
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
self.prefill(*args)
try:
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
self.prefill(*args)
except:
logger.exception("Cache.update failed for %s" % (self.name,))
raise
def prefill(self, *args): # because I can't *keyargs, value
keyargs = args[:-1]
@ -327,6 +338,8 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000)
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0