Hook up the push rules stream to account_data in /sync

This commit is contained in:
Mark Haines 2016-03-04 14:44:01 +00:00
parent 3406eba4ef
commit 1b4f4a936f
4 changed files with 85 additions and 69 deletions

View File

@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.push.clientformat import format_push_rules_for_user
from twisted.internet import defer from twisted.internet import defer
@ -224,6 +225,10 @@ class SyncHandler(BaseHandler):
) )
) )
account_data['m.push_rules'] = yield self.push_rules_for_user(
sync_config.user
)
tags_by_room = yield self.store.get_tags_for_user( tags_by_room = yield self.store.get_tags_for_user(
sync_config.user.to_string() sync_config.user.to_string()
) )
@ -322,6 +327,14 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync) defer.returnValue(room_sync)
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
rawrules = yield self.store.get_push_rules_for_user(user_id)
enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
rules = format_push_rules_for_user(user, rawrules, enabled_map)
defer.returnValue(rules)
def account_data_for_user(self, account_data): def account_data_for_user(self, account_data):
account_data_events = [] account_data_events = []
@ -481,6 +494,15 @@ class SyncHandler(BaseHandler):
) )
) )
push_rules_changed = yield self.store.have_push_rules_changed_for_user(
user_id, int(since_token.push_rules_key)
)
if push_rules_changed:
account_data["m.push_rules"] = yield self.push_rules_for_user(
sync_config.user
)
# Get a list of membership change events that have happened. # Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user( rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key user_id, since_token.room_key, now_token.room_key

View File

@ -156,7 +156,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
return 200, {} return 200, {}
def notify_user(self, user_id): def notify_user(self, user_id):
stream_id = self.store.get_push_rules_stream_token() stream_id, _ = self.store.get_push_rules_stream_token()
self.notifier.on_new_event( self.notifier.on_new_event(
"push_rules_key", stream_id, users=[user_id] "push_rules_key", stream_id, users=[user_id]
) )

View File

@ -160,6 +160,11 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=presence_cache_prefill prefilled_cache=presence_cache_prefill
) )
self.push_rules_stream_cache = StreamChangeCache(
"PushRulesStreamChangeCache",
self._push_rules_stream_id_gen.get_max_token()[0],
)
super(DataStore, self).__init__(hs) super(DataStore, self).__init__(hs)
def take_presence_startup_info(self): def take_presence_startup_info(self):

View File

@ -244,15 +244,10 @@ class PushRuleStore(SQLBaseStore):
) )
if update_stream: if update_stream:
self._simple_insert_txn( self._insert_push_rules_update_txn(
txn, txn, stream_id, stream_ordering, user_id, rule_id,
table="push_rules_stream", op="ADD",
values={ data={
"stream_id": stream_id,
"stream_ordering": stream_ordering,
"user_id": user_id,
"rule_id": rule_id,
"op": "ADD",
"priority_class": priority_class, "priority_class": priority_class,
"priority": priority, "priority": priority,
"conditions": conditions_json, "conditions": conditions_json,
@ -260,13 +255,6 @@ class PushRuleStore(SQLBaseStore):
} }
) )
txn.call_after(
self.get_push_rules_for_user.invalidate, (user_id,)
)
txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, (user_id,)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_push_rule(self, user_id, rule_id): def delete_push_rule(self, user_id, rule_id):
""" """
@ -284,22 +272,10 @@ class PushRuleStore(SQLBaseStore):
"push_rules", "push_rules",
{'user_name': user_id, 'rule_id': rule_id}, {'user_name': user_id, 'rule_id': rule_id},
) )
self._simple_insert_txn(
txn, self._insert_push_rules_update_txn(
table="push_rules_stream", txn, stream_id, stream_ordering, user_id, rule_id,
values={ op="DELETE"
"stream_id": stream_id,
"stream_ordering": stream_ordering,
"user_id": user_id,
"rule_id": rule_id,
"op": "DELETE",
}
)
txn.call_after(
self.get_push_rules_for_user.invalidate, (user_id,)
)
txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, (user_id,)
) )
with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
@ -328,23 +304,9 @@ class PushRuleStore(SQLBaseStore):
{'id': new_id}, {'id': new_id},
) )
self._simple_insert_txn( self._insert_push_rules_update_txn(
txn, txn, stream_id, stream_ordering, user_id, rule_id,
"push_rules_stream", op="ENABLE" if enabled else "DISABLE"
values={
"stream_id": stream_id,
"stream_ordering": stream_ordering,
"user_id": user_id,
"rule_id": rule_id,
"op": "ENABLE" if enabled else "DISABLE",
}
)
txn.call_after(
self.get_push_rules_for_user.invalidate, (user_id,)
)
txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, (user_id,)
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -370,18 +332,31 @@ class PushRuleStore(SQLBaseStore):
{'actions': actions_json}, {'actions': actions_json},
) )
self._simple_insert_txn( self._insert_push_rules_update_txn(
txn, txn, stream_id, stream_ordering, user_id, rule_id,
"push_rules_stream", op="ACTIONS", data={"actions": actions_json}
)
with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
yield self.runInteraction(
"set_push_rule_actions", set_push_rule_actions_txn,
stream_id, stream_ordering
)
def _insert_push_rules_update_txn(
self, txn, stream_id, stream_ordering, user_id, rule_id, op, data=None
):
values = { values = {
"stream_id": stream_id, "stream_id": stream_id,
"stream_ordering": stream_ordering, "stream_ordering": stream_ordering,
"user_id": user_id, "user_id": user_id,
"rule_id": rule_id, "rule_id": rule_id,
"op": "ACTIONS", "op": op,
"actions": actions_json,
} }
) if data is not None:
values.update(data)
self._simple_insert_txn(txn, "push_rules_stream", values=values)
txn.call_after( txn.call_after(
self.get_push_rules_for_user.invalidate, (user_id,) self.get_push_rules_for_user.invalidate, (user_id,)
@ -389,11 +364,8 @@ class PushRuleStore(SQLBaseStore):
txn.call_after( txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, (user_id,) self.get_push_rules_enabled_for_user.invalidate, (user_id,)
) )
txn.call_after(
with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
yield self.runInteraction(
"set_push_rule_actions", set_push_rule_actions_txn,
stream_id, stream_ordering
) )
def get_all_push_rule_updates(self, last_id, current_id, limit): def get_all_push_rule_updates(self, last_id, current_id, limit):
@ -403,7 +375,7 @@ class PushRuleStore(SQLBaseStore):
"SELECT stream_id, stream_ordering, user_id, rule_id," "SELECT stream_id, stream_ordering, user_id, rule_id,"
" op, priority_class, priority, conditions, actions" " op, priority_class, priority, conditions, actions"
" FROM push_rules_stream" " FROM push_rules_stream"
" WHERE ? < stream_id and stream_id <= ?" " WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?" " ORDER BY stream_id ASC LIMIT ?"
) )
txn.execute(sql, (last_id, current_id, limit)) txn.execute(sql, (last_id, current_id, limit))
@ -418,6 +390,23 @@ class PushRuleStore(SQLBaseStore):
room stream ordering it corresponds to.""" room stream ordering it corresponds to."""
return self._push_rules_stream_id_gen.get_max_token() return self._push_rules_stream_id_gen.get_max_token()
def have_push_rules_changed_for_user(self, user_id, last_id):
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
logger.error("FNARG")
return defer.succeed(False)
else:
def have_push_rules_changed_txn(txn):
sql = (
"SELECT COUNT(stream_id) FROM push_rules_stream"
" WHERE user_id = ? AND ? < stream_id"
)
txn.execute(sql, (user_id, last_id))
count, = txn.fetchone()
return bool(count)
return self.runInteraction(
"have_push_rules_changed", have_push_rules_changed_txn
)
class RuleNotFoundException(Exception): class RuleNotFoundException(Exception):
pass pass