Merge branch 'erikj/cache_perf' of github.com:matrix-org/synapse into develop

This commit is contained in:
Erik Johnston 2016-06-03 12:00:33 +01:00
commit 3b096c5f5c
2 changed files with 33 additions and 9 deletions

View File

@ -102,6 +102,15 @@ class ObservableDeferred(object):
def observers(self): def observers(self):
return self._observers return self._observers
def has_called(self):
return self._result is not None
def has_succeeded(self):
return self._result is not None and self._result[0] is True
def get_result(self):
return self._result[1]
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self._deferred, name) return getattr(self._deferred, name)

View File

@ -33,6 +33,7 @@ import functools
import inspect import inspect
import threading import threading
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -302,16 +303,21 @@ class CacheListDescriptor(object):
# cached is a dict arg -> deferred, where deferred results in a # cached is a dict arg -> deferred, where deferred results in a
# 2-tuple (`arg`, `result`) # 2-tuple (`arg`, `result`)
cached = {} results = {}
cached_defers = {}
missing = [] missing = []
for arg in list_args: for arg in list_args:
key = list(keyargs) key = list(keyargs)
key[self.list_pos] = arg key[self.list_pos] = arg
try: try:
res = cache.get(tuple(key)).observe() res = cache.get(tuple(key))
if not res.has_succeeded():
res = res.observe()
res.addCallback(lambda r, arg: (arg, r), arg) res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res cached_defers[arg] = res
else:
results[arg] = res.get_result()
except KeyError: except KeyError:
missing.append(arg) missing.append(arg)
@ -349,12 +355,21 @@ class CacheListDescriptor(object):
res = observer.observe() res = observer.observe()
res.addCallback(lambda r, arg: (arg, r), arg) res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res cached_defers[arg] = res
if cached_defers:
def update_results_dict(res):
results.update(res)
return results
return preserve_context_over_deferred(defer.gatherResults( return preserve_context_over_deferred(defer.gatherResults(
cached.values(), cached_defers.values(),
consumeErrors=True, consumeErrors=True,
).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) ).addCallback(update_results_dict).addErrback(
unwrapFirstError
))
else:
return results
obj.__dict__[self.orig.__name__] = wrapped obj.__dict__[self.orig.__name__] = wrapped