Merge branch 'develop' of github.com:matrix-org/synapse into erikj/sync

This commit is contained in:
Erik Johnston 2016-01-22 15:57:12 +00:00
commit 88baa3865e
8 changed files with 282 additions and 26 deletions

View File

@ -36,6 +36,7 @@ def decode_rule_json(rule):
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_rules(room_id, user_ids, store): def _get_rules(room_id, user_ids, store):
rules_by_user = yield store.bulk_get_push_rules(user_ids) rules_by_user = yield store.bulk_get_push_rules(user_ids)
rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
rules_by_user = { rules_by_user = {
uid: baserules.list_with_base_rules([ uid: baserules.list_with_base_rules([
@ -44,6 +45,22 @@ def _get_rules(room_id, user_ids, store):
]) ])
for uid in user_ids for uid in user_ids
} }
# We apply the rules-enabled map here: bulk_get_push_rules doesn't
# fetch disabled rules, but this won't account for any server default
# rules the user has disabled, so we need to do this too.
for uid in user_ids:
if uid not in rules_enabled_by_user:
continue
user_enabled_map = rules_enabled_by_user[uid]
for rule in rules_by_user[uid]:
rule_id = rule['rule_id']
if rule_id in user_enabled_map:
rule['enabled'] = user_enabled_map[rule_id]
defer.returnValue(rules_by_user) defer.returnValue(rules_by_user)

View File

@ -40,14 +40,20 @@ class EventPushActionsStore(SQLBaseStore):
'actions': json.dumps(actions) 'actions': json.dumps(actions)
}) })
def f(txn):
for uid, _, __ in tuples:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid)
)
return self._simple_insert_many_txn(txn, "event_push_actions", values)
yield self.runInteraction( yield self.runInteraction(
"set_actions_for_event_and_users", "set_actions_for_event_and_users",
self._simple_insert_many_txn, f,
"event_push_actions",
values
) )
@cachedInlineCallbacks(num_args=3) @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
): ):
@ -98,6 +104,11 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def remove_push_actions_for_event_id(self, room_id, event_id): def remove_push_actions_for_event_id(self, room_id, event_id):
def f(txn): def f(txn):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id,)
)
txn.execute( txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id) (room_id, event_id)

View File

@ -93,6 +93,35 @@ class PushRuleStore(SQLBaseStore):
results.setdefault(row['user_name'], []).append(row) results.setdefault(row['user_name'], []).append(row)
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks
def bulk_get_push_rules_enabled(self, user_ids):
if not user_ids:
defer.returnValue({})
batch_size = 100
def f(txn, user_ids_to_fetch):
sql = (
"SELECT user_name, rule_id, enabled"
" FROM push_rules_enable"
" WHERE user_name"
" IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")"
)
txn.execute(sql, user_ids_to_fetch)
return self.cursor_to_dict(txn)
results = {}
chunks = [user_ids[i:i+batch_size] for i in xrange(0, len(user_ids), batch_size)]
for batch_user_ids in chunks:
rows = yield self.runInteraction(
"bulk_get_push_rules_enabled", f, batch_user_ids
)
for row in rows:
results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def add_push_rule(self, before, after, **kwargs): def add_push_rule(self, before, after, **kwargs):
vals = kwargs vals = kwargs

View File

@ -17,6 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
from . import caches_by_name, DEBUG_CACHES, cache_counter from . import caches_by_name, DEBUG_CACHES, cache_counter
@ -36,9 +37,12 @@ _CacheSentinel = object()
class Cache(object): class Cache(object):
def __init__(self, name, max_entries=1000, keylen=1, lru=True): def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru: if lru:
self.cache = LruCache(max_size=max_entries) cache_type = TreeCache if tree else dict
self.cache = LruCache(
max_size=max_entries, keylen=keylen, cache_type=cache_type
)
self.max_entries = None self.max_entries = None
else: else:
self.cache = OrderedDict() self.cache = OrderedDict()
@ -99,6 +103,15 @@ class Cache(object):
self.sequence += 1 self.sequence += 1
self.cache.pop(key, None) self.cache.pop(key, None)
def invalidate_many(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
self.sequence += 1
self.cache.del_multi(key)
def invalidate_all(self): def invalidate_all(self):
self.check_thread() self.check_thread()
self.sequence += 1 self.sequence += 1
@ -122,7 +135,7 @@ class CacheDescriptor(object):
which can be used to insert values into the cache specifically, without which can be used to insert values into the cache specifically, without
calling the calculation function. calling the calculation function.
""" """
def __init__(self, orig, max_entries=1000, num_args=1, lru=True, def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
inlineCallbacks=False): inlineCallbacks=False):
self.orig = orig self.orig = orig
@ -134,6 +147,7 @@ class CacheDescriptor(object):
self.max_entries = max_entries self.max_entries = max_entries
self.num_args = num_args self.num_args = num_args
self.lru = lru self.lru = lru
self.tree = tree
self.arg_names = inspect.getargspec(orig).args[1:num_args+1] self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
@ -149,6 +163,7 @@ class CacheDescriptor(object):
max_entries=self.max_entries, max_entries=self.max_entries,
keylen=self.num_args, keylen=self.num_args,
lru=self.lru, lru=self.lru,
tree=self.tree,
) )
def __get__(self, obj, objtype=None): def __get__(self, obj, objtype=None):
@ -200,6 +215,7 @@ class CacheDescriptor(object):
wrapped.invalidate = self.cache.invalidate wrapped.invalidate = self.cache.invalidate
wrapped.invalidate_all = self.cache.invalidate_all wrapped.invalidate_all = self.cache.invalidate_all
wrapped.invalidate_many = self.cache.invalidate_many
wrapped.prefill = self.cache.prefill wrapped.prefill = self.cache.prefill
obj.__dict__[self.orig.__name__] = wrapped obj.__dict__[self.orig.__name__] = wrapped
@ -321,21 +337,23 @@ class CacheListDescriptor(object):
return wrapped return wrapped
def cached(max_entries=1000, num_args=1, lru=True): def cached(max_entries=1000, num_args=1, lru=True, tree=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
return lambda orig: CacheDescriptor( return lambda orig: CacheDescriptor(
orig, orig,
max_entries=max_entries, max_entries=max_entries,
num_args=num_args, num_args=num_args,
lru=lru, lru=lru,
tree=tree,
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru,
tree=tree,
inlineCallbacks=True, inlineCallbacks=True,
) )

View File

@ -17,11 +17,27 @@
from functools import wraps from functools import wraps
import threading import threading
from synapse.util.caches.treecache import TreeCache
def enumerate_leaves(node, depth):
if depth == 0:
yield node
else:
for n in node.values():
for m in enumerate_leaves(n, depth - 1):
yield m
class LruCache(object): class LruCache(object):
"""Least-recently-used cache.""" """
def __init__(self, max_size): Least-recently-used cache.
cache = {} Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.
"""
def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type()
self.size = 0
list_root = [] list_root = []
list_root[:] = [list_root, list_root, None, None] list_root[:] = [list_root, list_root, None, None]
@ -44,6 +60,7 @@ class LruCache(object):
prev_node[NEXT] = node prev_node[NEXT] = node
next_node[PREV] = node next_node[PREV] = node
cache[key] = node cache[key] = node
self.size += 1
def move_node_to_front(node): def move_node_to_front(node):
prev_node = node[PREV] prev_node = node[PREV]
@ -62,7 +79,7 @@ class LruCache(object):
next_node = node[NEXT] next_node = node[NEXT]
prev_node[NEXT] = next_node prev_node[NEXT] = next_node
next_node[PREV] = prev_node next_node[PREV] = prev_node
cache.pop(node[KEY], None) self.size -= 1
@synchronized @synchronized
def cache_get(key, default=None): def cache_get(key, default=None):
@ -81,8 +98,10 @@ class LruCache(object):
node[VALUE] = value node[VALUE] = value
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if self.size > max_size:
delete_node(list_root[PREV]) todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
@synchronized @synchronized
def cache_set_default(key, value): def cache_set_default(key, value):
@ -91,8 +110,10 @@ class LruCache(object):
return node[VALUE] return node[VALUE]
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if self.size > max_size:
delete_node(list_root[PREV]) todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
return value return value
@synchronized @synchronized
@ -100,10 +121,22 @@ class LruCache(object):
node = cache.get(key, None) node = cache.get(key, None)
if node: if node:
delete_node(node) delete_node(node)
cache.pop(node[KEY], None)
return node[VALUE] return node[VALUE]
else: else:
return default return default
@synchronized
def cache_del_multi(key):
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
if popped is None:
return
for leaf in enumerate_leaves(popped, keylen - len(key)):
delete_node(leaf)
@synchronized @synchronized
def cache_clear(): def cache_clear():
list_root[NEXT] = list_root list_root[NEXT] = list_root
@ -112,7 +145,7 @@ class LruCache(object):
@synchronized @synchronized
def cache_len(): def cache_len():
return len(cache) return self.size
@synchronized @synchronized
def cache_contains(key): def cache_contains(key):
@ -123,6 +156,8 @@ class LruCache(object):
self.set = cache_set self.set = cache_set
self.setdefault = cache_set_default self.setdefault = cache_set_default
self.pop = cache_pop self.pop = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.len = cache_len self.len = cache_len
self.contains = cache_contains self.contains = cache_contains
self.clear = cache_clear self.clear = cache_clear

View File

@ -0,0 +1,60 @@
SENTINEL = object()
class TreeCache(object):
"""
Tree-based backing store for LruCache. Allows subtrees of data to be deleted
efficiently.
Keys must be tuples.
"""
def __init__(self):
self.root = {}
def __setitem__(self, key, value):
return self.set(key, value)
def __contains__(self, key):
return self.get(key, SENTINEL) is not SENTINEL
def set(self, key, value):
node = self.root
for k in key[:-1]:
node = node.setdefault(k, {})
node[key[-1]] = value
def get(self, key, default=None):
node = self.root
for k in key[:-1]:
node = node.get(k, None)
if node is None:
return default
return node.get(key[-1], default)
def clear(self):
self.root = {}
def pop(self, key, default=None):
nodes = []
node = self.root
for k in key[:-1]:
node = node.get(k, None)
nodes.append(node) # don't add the root node
if node is None:
return default
popped = node.pop(key[-1], SENTINEL)
if popped is SENTINEL:
return default
node_and_keys = zip(nodes, key)
node_and_keys.reverse()
node_and_keys.append((self.root, None))
for i in range(len(node_and_keys) - 1):
n, k = node_and_keys[i]
if n:
break
node_and_keys[i+1][0].pop(k)
return popped

View File

@ -17,6 +17,7 @@
from .. import unittest from .. import unittest
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
class LruCacheTestCase(unittest.TestCase): class LruCacheTestCase(unittest.TestCase):
@ -52,3 +53,22 @@ class LruCacheTestCase(unittest.TestCase):
cache["key"] = 1 cache["key"] = 1
self.assertEquals(cache.pop("key"), 1) self.assertEquals(cache.pop("key"), 1)
self.assertEquals(cache.pop("key"), None) self.assertEquals(cache.pop("key"), None)
def test_del_multi(self):
cache = LruCache(4, 2, cache_type=TreeCache)
cache[("animal", "cat")] = "mew"
cache[("animal", "dog")] = "woof"
cache[("vehicles", "car")] = "vroom"
cache[("vehicles", "train")] = "chuff"
self.assertEquals(len(cache), 4)
self.assertEquals(cache.get(("animal", "cat")), "mew")
self.assertEquals(cache.get(("vehicles", "car")), "vroom")
cache.del_multi(("animal",))
self.assertEquals(len(cache), 2)
self.assertEquals(cache.get(("animal", "cat")), None)
self.assertEquals(cache.get(("animal", "dog")), None)
self.assertEquals(cache.get(("vehicles", "car")), "vroom")
self.assertEquals(cache.get(("vehicles", "train")), "chuff")
# Man from del_multi say "Yes".

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import unittest
from synapse.util.caches.treecache import TreeCache
class TreeCacheTestCase(unittest.TestCase):
def test_get_set_onelevel(self):
cache = TreeCache()
cache[("a",)] = "A"
cache[("b",)] = "B"
self.assertEquals(cache.get(("a",)), "A")
self.assertEquals(cache.get(("b",)), "B")
def test_pop_onelevel(self):
cache = TreeCache()
cache[("a",)] = "A"
cache[("b",)] = "B"
self.assertEquals(cache.pop(("a",)), "A")
self.assertEquals(cache.pop(("a",)), None)
self.assertEquals(cache.get(("b",)), "B")
def test_get_set_twolevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.get(("a", "a")), "AA")
self.assertEquals(cache.get(("a", "b")), "AB")
self.assertEquals(cache.get(("b", "a")), "BA")
def test_pop_twolevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.pop(("a", "a")), "AA")
self.assertEquals(cache.get(("a", "a")), None)
self.assertEquals(cache.get(("a", "b")), "AB")
self.assertEquals(cache.pop(("b", "a")), "BA")
self.assertEquals(cache.pop(("b", "a")), None)
def test_pop_mixedlevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.get(("a", "a")), "AA")
cache.pop(("a",))
self.assertEquals(cache.get(("a", "a")), None)
self.assertEquals(cache.get(("a", "b")), None)
self.assertEquals(cache.get(("b", "a")), "BA")