Merge pull request #505 from matrix-org/erikj/push_fast

Push actions perf
This commit is contained in:
Erik Johnston 2016-01-19 16:16:05 +00:00
commit 40d9765123
7 changed files with 49 additions and 14 deletions

View File

@ -266,7 +266,7 @@ class BaseHandler(object):
event, context=context event, context=context
) )
action_generator = ActionGenerator(self.store) action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event( yield action_generator.handle_push_actions_for_event(
event, self event, self
) )

View File

@ -245,7 +245,7 @@ class FederationHandler(BaseHandler):
yield user_joined_room(self.distributor, user, event.room_id) yield user_joined_room(self.distributor, user, event.room_id)
if not backfilled and not event.internal_metadata.is_outlier(): if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.store) action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event( yield action_generator.handle_push_actions_for_event(
event, self event, self
) )

View File

@ -25,8 +25,9 @@ logger = logging.getLogger(__name__)
class ActionGenerator: class ActionGenerator:
def __init__(self, store): def __init__(self, hs):
self.store = store self.hs = hs
self.store = hs.get_datastore()
# really we want to get all user ids and all profile tags too, # really we want to get all user ids and all profile tags too,
# since we want the actions for each profile tag for every user and # since we want the actions for each profile tag for every user and
# also actions for a client with no profile tag for each user. # also actions for a client with no profile tag for each user.
@ -42,7 +43,7 @@ class ActionGenerator:
) )
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.store event.room_id, self.hs, self.store
) )
actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)

View File

@ -36,6 +36,7 @@ def decode_rule_json(rule):
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_rules(room_id, user_ids, store): def _get_rules(room_id, user_ids, store):
rules_by_user = yield store.bulk_get_push_rules(user_ids) rules_by_user = yield store.bulk_get_push_rules(user_ids)
rules_by_user = { rules_by_user = {
uid: baserules.list_with_base_rules([ uid: baserules.list_with_base_rules([
decode_rule_json(rule_list) decode_rule_json(rule_list)
@ -47,12 +48,16 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks @defer.inlineCallbacks
def evaluator_for_room_id(room_id, store): def evaluator_for_room_id(room_id, hs, store):
users = yield store.get_users_in_room(room_id) results = yield store.get_receipts_for_room(room_id, "m.read")
rules_by_user = yield _get_rules(room_id, users, store) user_ids = [
row["user_id"] for row in results
if hs.is_mine_id(row["user_id"])
]
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator( defer.returnValue(BulkPushRuleEvaluator(
room_id, rules_by_user, users, store room_id, rules_by_user, user_ids, store
)) ))
@ -129,7 +134,7 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
res = evaluator.matches(cond, uid, display_name, None) res = evaluator.matches(cond, uid, display_name, None)
if _id: if _id:
cache[_id] = res cache[_id] = bool(res)
if not res: if not res:
return False return False

View File

@ -22,6 +22,7 @@ import simplejson as json
import re import re
from synapse.types import UserID from synapse.types import UserID
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -277,18 +278,18 @@ def _glob_matches(glob, value, word_boundary=False):
) )
if word_boundary: if word_boundary:
r = r"\b%s\b" % (r,) r = r"\b%s\b" % (r,)
r = re.compile(r, flags=re.IGNORECASE) r = _compile_regex(r)
return r.search(value) return r.search(value)
else: else:
r = r + "$" r = r + "$"
r = re.compile(r, flags=re.IGNORECASE) r = _compile_regex(r)
return r.match(value) return r.match(value)
elif word_boundary: elif word_boundary:
r = re.escape(glob) r = re.escape(glob)
r = r"\b%s\b" % (r,) r = r"\b%s\b" % (r,)
r = re.compile(r, flags=re.IGNORECASE) r = _compile_regex(r)
return r.search(value) return r.search(value)
else: else:
@ -306,3 +307,16 @@ def _flatten_dict(d, prefix=[], result={}):
_flatten_dict(value, prefix=(prefix+[key]), result=result) _flatten_dict(value, prefix=(prefix+[key]), result=result)
return result return result
regex_cache = LruCache(5000)
def _compile_regex(regex_str):
r = regex_cache.get(regex_str, None)
if r:
return r
r = re.compile(regex_str, flags=re.IGNORECASE)
regex_cache[regex_str] = r
return r

View File

@ -139,6 +139,9 @@ class BaseHomeServer(object):
def is_mine(self, domain_specific_string): def is_mine(self, domain_specific_string):
return domain_specific_string.domain == self.hostname return domain_specific_string.domain == self.hostname
def is_mine_id(self, string):
return string.split(":", 1)[1] == self.hostname
# Build magic accessors for every dependency # Build magic accessors for every dependency
for depname in BaseHomeServer.DEPENDENCIES: for depname in BaseHomeServer.DEPENDENCIES:
BaseHomeServer._make_dependency_method(depname) BaseHomeServer._make_dependency_method(depname)

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches import cache_counter, caches_by_name
from twisted.internet import defer from twisted.internet import defer
@ -33,6 +33,18 @@ class ReceiptsStore(SQLBaseStore):
self._receipts_stream_cache = _RoomStreamChangeCache() self._receipts_stream_cache = _RoomStreamChangeCache()
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
return self._simple_select_list(
table="receipts_linearized",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
},
retcols=("user_id", "event_id"),
desc="get_receipts_for_room",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
"""Get receipts for multiple rooms for sending to clients. """Get receipts for multiple rooms for sending to clients.