From 21f135ba763a583ecf9ba2714b5151f6b14b61fd Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 16:26:08 +0000 Subject: [PATCH 01/33] Very first cut of calculating actions for events as they come in. Doesn't store them yet. Not very efficient. --- synapse/handlers/_base.py | 8 ++++++ synapse/handlers/federation.py | 15 +++++++++- synapse/push/action_generator.py | 47 ++++++++++++++++++++++++++++++++ synapse/storage/registration.py | 12 ++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 synapse/push/action_generator.py diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 5fd20285d..a8e8c4f5a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -19,9 +19,12 @@ from synapse.api.errors import LimitExceededError, SynapseError, AuthError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias +from synapse.push.action_generator import ActionGenerator from synapse.util.logcontext import PreserveLoggingContext +from synapse.events.utils import serialize_event + import logging @@ -264,6 +267,11 @@ class BaseHandler(object): event, context=context ) + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec() + )) + destinations = set(extra_destinations) for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2855f2d7c..18289eb52 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,10 +32,12 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, serialize_event from synapse.util.retryutils import NotRetryingDestination +from synapse.push.action_generator import ActionGenerator + from twisted.internet import defer import itertools @@ -1113,6 +1115,11 @@ class FederationHandler(BaseHandler): current_state=current_state, ) + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec()) + ) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks @@ -1139,6 +1146,12 @@ class FederationHandler(BaseHandler): is_new_state=(not outliers and not backfilled), ) + for ev_info in event_infos: + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + ev_info["event"], self.clock.time_msec()) + ) + @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py new file mode 100644 index 000000000..508eeaed9 --- /dev/null +++ b/synapse/push/action_generator.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import push_rule_evaluator + +import logging + + +logger = logging.getLogger(__name__) + + +class ActionGenerator: + def __init__(self, store): + self.store = store + # really we want to get all user ids and all profile tags too, + # since we want the actions for each profile tag for every user and + # also actions for a client with no profile tag for each user. + # Currently the event stream doesn't support profile tags on an + # event stream, so we just run the rules for a client with no profile + # tag (ie. we just need all the users). + + @defer.inlineCallbacks + def handle_event(self, event): + users = yield self.store.get_users_in_room(event['room_id']) + logger.error("users in room: %r", users) + + for uid in users: + evaluator = yield push_rule_evaluator.\ + evaluator_for_user_name_and_profile_tag( + uid, None, event['room_id'], self.store + ) + actions = yield evaluator.actions_for_event(event) + logger.info("actions for user %s: %s", uid, actions) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 2e5eddd25..f230faa25 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -291,6 +291,18 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(ret['user_id']) defer.returnValue(None) + @defer.inlineCallbacks + def get_all_user_ids(self): + """Returns all user ids registered on this homeserver""" + return self.runInteraction( + "get_all_user_ids", + self._get_all_user_ids_txn + ) + + def _get_all_user_ids_txn(self, txn): + txn.execute("SELECT name from users") + return [r[0] for r in txn.fetchall()] + @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" From a84a6933274c0da64c41ac494b027ab5010a4801 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 17:18:46 +0000 Subject: [PATCH 02/33] Having consulted The Erikle, this should go at the end of on_receive_pdu, otherwise it will be triggered whenever we backfill too. --- synapse/handlers/federation.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 18289eb52..6a4269b50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -244,6 +244,12 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec()) + ) + @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1115,11 +1121,6 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - action_generator = ActionGenerator(self.store) - yield action_generator.handle_event(serialize_event( - event, self.clock.time_msec()) - ) - defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks @@ -1146,12 +1147,6 @@ class FederationHandler(BaseHandler): is_new_state=(not outliers and not backfilled), ) - for ev_info in event_infos: - action_generator = ActionGenerator(self.store) - yield action_generator.handle_event(serialize_event( - ev_info["event"], self.clock.time_msec()) - ) - @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the From aa667ee396c473f497b084655d47b2a9520a538a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 17:51:15 +0000 Subject: [PATCH 03/33] Save event actions to the db --- synapse/push/action_generator.py | 6 ++- synapse/storage/__init__.py | 2 + synapse/storage/event_actions.py | 41 +++++++++++++++++++ .../storage/schema/delta/27/event_actions.sql | 25 +++++++++++ 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/event_actions.py create mode 100644 synapse/storage/schema/delta/27/event_actions.sql diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 508eeaed9..870c68a0c 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,7 +19,6 @@ import push_rule_evaluator import logging - logger = logging.getLogger(__name__) @@ -42,6 +41,9 @@ class ActionGenerator: evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store - ) + ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c46b653f1..a112dd237 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore +from .event_actions import EventActionsStore from .state import StateStore from .signatures import SignatureStore @@ -75,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, + EventActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py new file mode 100644 index 000000000..593b1714c --- /dev/null +++ b/synapse/storage/event_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + + +class EventActionsStore(SQLBaseStore): + @defer.inlineCallbacks + def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + actionsJson = json.dumps(actions) + + ret = yield self.runInteraction( + "_set_actions_for_event", + self._simple_upsert_txn, + EventActionsTable.table_name, + {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + {'actions': actionsJson} + ) + defer.returnValue(ret) + + +class EventActionsTable(object): + table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql new file mode 100644 index 000000000..1246823a0 --- /dev/null +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -0,0 +1,25 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_actions( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) +); + + +CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); From 5e909c73d76cd56e3eac91635a99405182dcac3c Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 18:40:28 +0000 Subject: [PATCH 04/33] Store nothing instead of ['dont_notify'] for events with no notification required: much as it would be nice to be able to tell between the event not having been processed and there being no notification for it, this isn't worth filling up the table with ['dont_notify'] I think. Consequently treat the empty actions array as dont_notify and filter dont_notify out of the result. --- synapse/push/__init__.py | 18 +++--------------- synapse/push/action_generator.py | 8 ++++---- synapse/push/push_rule_evaluator.py | 9 +++++++-- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e7c964bcd..d5928c1d2 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -157,21 +157,7 @@ class Pusher(object): actions = yield rule_evaluator.actions_for_event(single_event) tweaks = rule_evaluator.tweaks_for_actions(actions) - if len(actions) == 0: - logger.warn("Empty actions! Using default action.") - actions = Pusher.DEFAULT_ACTIONS - - if 'notify' not in actions and 'dont_notify' not in actions: - logger.warn("Neither notify nor dont_notify in actions: adding default") - actions.extend(Pusher.DEFAULT_ACTIONS) - - if 'dont_notify' in actions: - logger.debug( - "%s for %s: dont_notify", - single_event['event_id'], self.user_name - ) - processed = True - else: + if 'notify' in actions: rejected = yield self.dispatch_push(single_event, tweaks) self.has_unread = True if isinstance(rejected, list) or isinstance(rejected, tuple): @@ -192,6 +178,8 @@ class Pusher(object): yield self.hs.get_pusherpool().remove_pusher( self.app_id, pk, self.user_name ) + else: + processed = True if not self.alive: return diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 870c68a0c..a72a7d703 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,7 +35,6 @@ class ActionGenerator: @defer.inlineCallbacks def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - logger.error("users in room: %r", users) for uid in users: evaluator = yield push_rule_evaluator.\ @@ -44,6 +43,7 @@ class ActionGenerator: ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) - self.store.set_actions_for_event( - event['event_id'], uid, None, actions - ) + if len(actions): + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 92c7fd048..420476fd0 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -43,7 +43,7 @@ def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, sto class PushRuleEvaluator: - DEFAULT_ACTIONS = ['dont_notify'] + DEFAULT_ACTIONS = [] INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id, @@ -85,7 +85,7 @@ class PushRuleEvaluator: """ if ev['user_id'] == self.user_name: # let's assume you probably know about messages you sent yourself - defer.returnValue(['dont_notify']) + defer.returnValue([]) room_id = ev['room_id'] @@ -131,6 +131,11 @@ class PushRuleEvaluator: "%s matches for user %s, event %s", r['rule_id'], self.user_name, ev['event_id'] ) + + # filter out dont_notify as we treat an empty actions list + # as dont_notify, and this doesn't take up a row in our database + actions = [x for x in actions if x != 'dont_notify'] + defer.returnValue(actions) logger.info( From 42ad49f5b75c2c645c4060026c21c5572f5b1063 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Dec 2015 18:42:09 +0000 Subject: [PATCH 05/33] still very WIP, but now sends unread_notifications_count in the room object on sync (only actually corrrect in a full sync: hardcoded to 0 in incremental syncs). --- synapse/handlers/sync.py | 26 +++++++++ synapse/push/action_generator.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 1 + synapse/storage/event_actions.py | 53 ++++++++++++++++++- .../storage/schema/delta/27/event_actions.sql | 5 +- 5 files changed, 82 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 24c2b2fad..6d193a10c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,6 +52,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "state", # dict[(str, str), FrozenEvent] "ephemeral", "account_data", + "unread_notification_count", ])): __slots__ = [] @@ -64,6 +65,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ or self.state or self.ephemeral or self.account_data + or self.unread_notification_count > 0 ) @@ -161,6 +163,18 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) + def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): + if room_id not in ephemeral_by_room: + return None + for e in ephemeral_by_room[room_id]: + if e['type'] != 'm.receipt': + continue + for receipt_event_id,val in e['content'].items(): + if 'm.read' in val: + if user_id in val['m.read']: + return receipt_event_id + return None + @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -265,6 +279,16 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room( + room_id, last_unread_event_id + ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( @@ -275,6 +299,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=len(notifs) )) def account_data_for_user(self, account_data): @@ -509,6 +534,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=0 ) logger.debug("Result for room %s: %r", room_id, room_sync) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a72a7d703..1c7cd3166 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -45,5 +45,5 @@ class ActionGenerator: logger.info("actions for user %s: %s", uid, actions) if len(actions): self.store.set_actions_for_event( - event['event_id'], uid, None, actions + event, uid, None, actions ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f0a637a6d..4ca10732c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -304,6 +304,7 @@ class SyncRestServlet(RestServlet): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, + "unread_notification_count": room.unread_notification_count } if joined: diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 593b1714c..40ac8e2d2 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,18 +24,67 @@ logger = logging.getLogger(__name__) class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + def set_actions_for_event(self, event, user_id, profile_tag, actions): actionsJson = json.dumps(actions) ret = yield self.runInteraction( "_set_actions_for_event", self._simple_upsert_txn, EventActionsTable.table_name, - {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + { + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': user_id, + 'profile_tag': profile_tag + }, {'actions': actionsJson} ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_unread_event_actions_by_room(self, room_id, last_read_event_id): + #events = yield self._get_events( + # [last_read_event_id], + # check_redacted=False + #) + + def _get_unread_event_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.actions" + " FROM event_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, + (room_id, topological_ordering, topological_ordering, stream_ordering) + ) + return txn.fetchall() + + ret = yield self.runInteraction( + "get_unread_event_actions_by_room", + _get_unread_event_actions_by_room + ) + defer.returnValue(ret) class EventActionsTable(object): table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql index 1246823a0..bbdaee990 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -14,12 +14,13 @@ */ CREATE TABLE IF NOT EXISTS event_actions( + room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) ); -CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); +CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); From 413d0d6a2404c579b1fa39ece9a698f9df8349db Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Dec 2015 17:47:00 +0000 Subject: [PATCH 06/33] Make unread notification count sending work: put the correct count in incremental syncs too, where necessary, and fix silly bugs like only select the event actions for that user... --- synapse/handlers/sync.py | 48 ++++++++++++++++++++++++-------- synapse/storage/event_actions.py | 21 ++++++++------ 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6d193a10c..44420a063 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -65,7 +65,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ or self.state or self.ephemeral or self.account_data - or self.unread_notification_count > 0 + # nb the notification count does not, er, count: if there's nothing + # else in the result, we don't need to send it. ) @@ -279,15 +280,12 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - last_unread_event_id = self.last_read_event_id_for_room_and_user( - room_id, sync_config.user.to_string(), ephemeral_by_room + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, ephemeral_by_room ) - - notifs = [] - if last_unread_event_id: - notifs = yield self.store.get_unread_event_actions_by_room( - room_id, last_unread_event_id - ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) current_state = yield self.get_state_at(room_id, now_token) @@ -299,7 +297,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=len(notifs) + unread_notification_count=notif_count )) def account_data_for_user(self, account_data): @@ -441,6 +439,10 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + _, all_ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token + ) + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -514,6 +516,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, all_ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + just_joined = yield self.check_joined_room(sync_config, state) if just_joined: logger.debug("User has just joined %s: needs full state", @@ -534,7 +543,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=0 + unread_notification_count=notif_count ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -805,3 +814,20 @@ class SyncHandler(BaseHandler): if join_event.content["membership"] == Membership.JOIN: return True return False + + @defer.inlineCallbacks + def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room_for_user( + room_id, sync_config.user.to_string(), last_unread_event_id + ) + else: + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) + defer.returnValue(notifs) \ No newline at end of file diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 40ac8e2d2..f7fe78e55 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -42,12 +42,9 @@ class EventActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_event_actions_by_room(self, room_id, last_read_event_id): - #events = yield self._get_events( - # [last_read_event_id], - # check_redacted=False - #) - + def get_unread_event_actions_by_room_for_user( + self, room_id, user_id, last_read_event_id + ): def _get_unread_event_actions_by_room(txn): sql = ( "SELECT stream_ordering, topological_ordering" @@ -65,10 +62,11 @@ class EventActionsStore(SQLBaseStore): topological_ordering = results[0][1] sql = ( - "SELECT ea.actions" + "SELECT ea.event_id, ea.actions" " FROM event_actions ea, events e" " WHERE ea.room_id = e.room_id" " AND ea.event_id = e.event_id" + " AND ea.user_id = ?" " AND ea.room_id = ?" " AND (" " e.topological_ordering > ?" @@ -76,9 +74,14 @@ class EventActionsStore(SQLBaseStore): ")" ) txn.execute(sql, - (room_id, topological_ordering, topological_ordering, stream_ordering) + ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) ) - return txn.fetchall() + return [ + { "event_id": row[0], "actions": row[1] } for row in txn.fetchall() + ] ret = yield self.runInteraction( "get_unread_event_actions_by_room", From b131fb1fe26da50ce30656cadbb24e72bd7ecdf9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Dec 2015 18:04:45 +0000 Subject: [PATCH 07/33] add list of things I want to fix with this branch --- problems_with_this_branch | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 problems_with_this_branch diff --git a/problems_with_this_branch b/problems_with_this_branch new file mode 100644 index 000000000..9c5df5521 --- /dev/null +++ b/problems_with_this_branch @@ -0,0 +1,5 @@ + * processing push by bluntly running the rules for each user + * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table + * consequently converting events back & forth between objects & dicts when processing + * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt + * doing 2 more db queries per room to get the unread notif count From 091c545c4fb38f662b61cb46779a813f70971e4f Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 21 Dec 2015 10:14:57 +0000 Subject: [PATCH 08/33] pep8 --- synapse/handlers/sync.py | 6 +++--- synapse/storage/event_actions.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 44420a063..20b2a2595 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -170,7 +170,7 @@ class SyncHandler(BaseHandler): for e in ephemeral_by_room[room_id]: if e['type'] != 'm.receipt': continue - for receipt_event_id,val in e['content'].items(): + for receipt_event_id, val in e['content'].items(): if 'm.read' in val: if user_id in val['m.read']: return receipt_event_id @@ -281,7 +281,7 @@ class SyncHandler(BaseHandler): ) notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config, ephemeral_by_room + room_id, sync_config, ephemeral_by_room ) notif_count = None if notifs is not None: @@ -830,4 +830,4 @@ class SyncHandler(BaseHandler): # There is no new information in this period, so your notification # count is whatever it was last time. defer.returnValue(None) - defer.returnValue(notifs) \ No newline at end of file + defer.returnValue(notifs) diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index f7fe78e55..fbd0a4227 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -73,14 +73,13 @@ class EventActionsStore(SQLBaseStore): " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" ")" ) - txn.execute(sql, - ( - user_id, room_id, - topological_ordering, topological_ordering, stream_ordering - ) + txn.execute(sql, ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) ) return [ - { "event_id": row[0], "actions": row[1] } for row in txn.fetchall() + {"event_id": row[0], "actions": row[1]} for row in txn.fetchall() ] ret = yield self.runInteraction( @@ -89,5 +88,6 @@ class EventActionsStore(SQLBaseStore): ) defer.returnValue(ret) + class EventActionsTable(object): table_name = "event_actions" From f73f154ec2c8ffdc49270d3ccaf3053f915800f3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 21 Dec 2015 15:28:54 +0000 Subject: [PATCH 09/33] Only run pushers for users on this hs! --- synapse/handlers/_base.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/push/action_generator.py | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a8e8c4f5a..24c4c6269 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,7 +267,7 @@ class BaseHandler(object): event, context=context ) - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6a4269b50..6525bde43 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,7 +245,7 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 1c7cd3166..6e107ca79 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.types import UserID + import push_rule_evaluator import logging @@ -23,7 +25,8 @@ logger = logging.getLogger(__name__) class ActionGenerator: - def __init__(self, store): + def __init__(self, hs, store): + self.hs = hs self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -37,6 +40,9 @@ class ActionGenerator: users = yield self.store.get_users_in_room(event['room_id']) for uid in users: + if not self.hs.is_mine(UserID.from_string(uid)): + continue + evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store From 65c451cb3878fb41f28a2adecd638894e18f5343 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 15:19:34 +0000 Subject: [PATCH 10/33] Add bulk push rule evaluator which actually still evaluates rules one by one, but does far fewer db queries to fetch the rules --- synapse/push/action_generator.py | 26 +++---- synapse/push/bulk_push_rule_evaluator.py | 99 ++++++++++++++++++++++++ synapse/push/push_rule_evaluator.py | 13 ++-- synapse/storage/push_rule.py | 41 ++++++++++ 4 files changed, 158 insertions(+), 21 deletions(-) create mode 100644 synapse/push/bulk_push_rule_evaluator.py diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 6e107ca79..2ad5f82da 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.types import UserID - -import push_rule_evaluator +import bulk_push_rule_evaluator import logging @@ -39,17 +37,13 @@ class ActionGenerator: def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - for uid in users: - if not self.hs.is_mine(UserID.from_string(uid)): - continue + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( + event['room_id'], self.hs, self.store + ) - evaluator = yield push_rule_evaluator.\ - evaluator_for_user_name_and_profile_tag( - uid, None, event['room_id'], self.store - ) - actions = yield evaluator.actions_for_event(event) - logger.info("actions for user %s: %s", uid, actions) - if len(actions): - self.store.set_actions_for_event( - event, uid, None, actions - ) + actions_by_user = bulk_evaluator.action_for_event_by_user(event) + + for uid,actions in actions_by_user.items(): + self.store.set_actions_for_event( + event, uid, None, actions + ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py new file mode 100644 index 000000000..f531d2edc --- /dev/null +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import simplejson as json + +from twisted.internet import defer + +from synapse.types import UserID + +import baserules +from push_rule_evaluator import PushRuleEvaluator + +logger = logging.getLogger(__name__) + + +def decode_rule_json(rule): + rule['conditions'] = json.loads(rule['conditions']) + rule['actions'] = json.loads(rule['actions']) + return rule + + +@defer.inlineCallbacks +def evaluator_for_room_id(room_id, hs, store): + users = yield store.get_users_in_room(room_id) + rules_by_user = yield store.bulk_get_push_rules(users) + rules_by_user = { + uid: baserules.list_with_base_rules( + [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] + if uid in rules_by_user else [], + UserID.from_string(uid) + ) + for uid in users + } + member_events = yield store.get_current_state( + room_id=room_id, + event_type='m.room.member', + ) + display_names = {} + for ev in member_events: + if ev.content.get("displayname"): + display_names[ev.state_key] = ev.content.get("displayname") + + defer.returnValue(BulkPushRuleEvaluator( + room_id, rules_by_user, display_names, users + )) + + +class BulkPushRuleEvaluator: + def __init__(self, room_id, rules_by_user, display_names, users_in_room): + self.room_id = room_id + self.rules_by_user = rules_by_user + self.display_names = display_names + self.users_in_room = users_in_room + + def action_for_event_by_user(self, event): + actions_by_user = {} + + for uid, rules in self.rules_by_user.items(): + display_name = None + if uid in self.display_names: + display_name = self.display_names[uid] + + for rule in rules: + if 'enabled' in rule and not rule['enabled']: + continue + + # XXX: profile tags + if BulkPushRuleEvaluator.event_matches_rule( + event, rule, + display_name, len(self.users_in_room), None + ): + actions = [x for x in rule['actions'] if x != 'dont_notify'] + if len(actions) > 0: + actions_by_user[uid] = actions + break + return actions_by_user + + @staticmethod + def event_matches_rule(event, rule, + display_name, room_member_count, profile_tag): + matches = True + for cond in rule['conditions']: + matches &= PushRuleEvaluator._event_fulfills_condition( + event, cond, display_name, room_member_count, profile_tag + ) + return matches \ No newline at end of file diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 420476fd0..40c7622ec 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -113,7 +113,8 @@ class PushRuleEvaluator: for c in conditions: matches &= self._event_fulfills_condition( ev, c, display_name=my_display_name, - room_member_count=room_member_count + room_member_count=room_member_count, + profile_tag=self.profile_tag ) logger.debug( "Rule %s %s", @@ -156,16 +157,18 @@ class PushRuleEvaluator: re.sub(r'\\\-', '-', x.group(2)))), r) return r - def _event_fulfills_condition(self, ev, condition, display_name, room_member_count): + @staticmethod + def _event_fulfills_condition(ev, condition, + display_name, room_member_count, profile_tag): if condition['kind'] == 'event_match': if 'pattern' not in condition: logger.warn("event_match condition with no pattern") return False # XXX: optimisation: cache our pattern regexps if condition['key'] == 'content.body': - r = r'\b%s\b' % self._glob_to_regexp(condition['pattern']) + r = r'\b%s\b' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) else: - r = r'^%s$' % self._glob_to_regexp(condition['pattern']) + r = r'^%s$' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) val = _value_for_dotted_key(condition['key'], ev) if val is None: return False @@ -174,7 +177,7 @@ class PushRuleEvaluator: elif condition['kind'] == 'device': if 'profile_tag' not in condition: return True - return condition['profile_tag'] == self.profile_tag + return condition['profile_tag'] == profile_tag elif condition['kind'] == 'contains_display_name': # This is special because display names can be different diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 5305b7e12..9dec4aa68 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -55,6 +55,47 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) + @defer.inlineCallbacks + def bulk_get_push_rules(self, user_ids): + batch_size = 100 + + def f(txn, user_ids_to_fetch): + sql = ( + "SELECT " + + ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + " FROM " + PushRuleTable.table_name + " pr " + + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + + " WHERE pr.user_name " + + " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " AND (pre.enabled is null or pre.enabled = 1)" + " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" + ) + txn.execute(sql, user_ids_to_fetch) + return txn.fetchall() + + results = {} + + batch_start = 0 + while batch_start < len(user_ids): + batch_end = max(len(user_ids), batch_size) + batch_user_ids = user_ids[batch_start:batch_end] + batch_start = batch_end + + rows = yield self.runInteraction( + "bulk_get_push_rules", f, batch_user_ids + ) + + for r in rows: + rawdict = { + PushRuleTable.fields[i]: r[i] for i in range(len(r)) + } + + if rawdict['user_name'] not in results: + results[rawdict['user_name']] = [] + results[rawdict['user_name']].append(rawdict) + defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = kwargs From 77f06856b612e2905b5b69f6f2de75ddd348adfd Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 15:22:26 +0000 Subject: [PATCH 11/33] clarify problems --- problems_with_this_branch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/problems_with_this_branch b/problems_with_this_branch index 9c5df5521..baed6f5c6 100644 --- a/problems_with_this_branch +++ b/problems_with_this_branch @@ -2,4 +2,4 @@ * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table * consequently converting events back & forth between objects & dicts when processing * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt - * doing 2 more db queries per room to get the unread notif count + * doing 2 more db queries per room at sync time to get the unread notif count From 4c8f6a7e427cc0e22ff1a19c3f1d9da0f9438f18 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:04:31 +0000 Subject: [PATCH 12/33] Insert push actions in a single db query rather than one per user/profile_tag --- synapse/push/action_generator.py | 10 ++++++---- synapse/storage/event_actions.py | 31 ++++++++++++++++++------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 2ad5f82da..148b1bda8 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -43,7 +43,9 @@ class ActionGenerator: actions_by_user = bulk_evaluator.action_for_event_by_user(event) - for uid,actions in actions_by_user.items(): - self.store.set_actions_for_event( - event, uid, None, actions - ) + yield self.store.set_actions_for_event_and_users( + event, + [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] + ) diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index fbd0a4227..3efa445c1 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,22 +24,27 @@ logger = logging.getLogger(__name__) class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event, user_id, profile_tag, actions): - actionsJson = json.dumps(actions) - - ret = yield self.runInteraction( - "_set_actions_for_event", - self._simple_upsert_txn, - EventActionsTable.table_name, - { + def set_actions_for_event_and_users(self, event, tuples): + """ + :param event: the event set actions for + :param tuples: list of tuples of (user_id, profile_tag, actions) + """ + values = [] + for uid, profile_tag, actions in tuples: + values.append({ 'room_id': event['room_id'], 'event_id': event['event_id'], - 'user_id': user_id, - 'profile_tag': profile_tag - }, - {'actions': actionsJson} + 'user_id': uid, + 'profile_tag': profile_tag, + 'actions': json.dumps(actions) + }) + + yield self.runInteraction( + "set_actions_for_event_and_users", + self._simple_insert_many_txn, + EventActionsTable.table_name, + values ) - defer.returnValue(ret) @defer.inlineCallbacks def get_unread_event_actions_by_room_for_user( From 3fbb0317453f00f4d84ebc52145d8afb6490909f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:13:47 +0000 Subject: [PATCH 13/33] Remove the list of problems (moved to jira issues) --- problems_with_this_branch | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 problems_with_this_branch diff --git a/problems_with_this_branch b/problems_with_this_branch deleted file mode 100644 index baed6f5c6..000000000 --- a/problems_with_this_branch +++ /dev/null @@ -1,5 +0,0 @@ - * processing push by bluntly running the rules for each user - * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table - * consequently converting events back & forth between objects & dicts when processing - * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt - * doing 2 more db queries per room at sync time to get the unread notif count From 5645d9747b17e9d119cc7badd7c2abe3c157a1a6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:19:22 +0000 Subject: [PATCH 14/33] Add some comments to areas that could be optimised. --- synapse/handlers/sync.py | 3 +++ synapse/push/__init__.py | 4 +++- synapse/push/bulk_push_rule_evaluator.py | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4cbb43a31..fa5e954e0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -447,6 +447,9 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + # We now fetch all ephemeral events for this room in order to get + # this users current read receipt. This could almost certainly be + # optimised. _, all_ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token ) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index d5928c1d2..635dedd52 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,7 +26,9 @@ import random logger = logging.getLogger(__name__) - +# Pushers could now be moved to pull out of the event_actions table instead +# of listening on the event stream: this would avoid them having to run the +# rules again. class Pusher(object): INITIAL_BACKOFF = 1000 MAX_BACKOFF = 60 * 60 * 1000 diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f531d2edc..1c0fa72b2 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -59,6 +59,14 @@ def evaluator_for_room_id(room_id, hs, store): class BulkPushRuleEvaluator: + """ + Runs push rules for all users in a room. + This is faster than running PushRuleEvaluator for each user because it + fetches all the rules for all the users in one (batched) db query + rarher than doing multiple queries per-user. It currently uses + the same logic to run the actual rules, but could be optimised further + (see https://matrix.org/jira/browse/SYN-562) + """ def __init__(self, room_id, rules_by_user, display_names, users_in_room): self.room_id = room_id self.rules_by_user = rules_by_user From 9b4cd0cd0f31803657018bf0ac9178787d796912 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:25:09 +0000 Subject: [PATCH 15/33] pep8 & unused variable --- synapse/push/__init__.py | 1 + synapse/push/action_generator.py | 2 -- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 635dedd52..250f22a16 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,6 +26,7 @@ import random logger = logging.getLogger(__name__) + # Pushers could now be moved to pull out of the event_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 148b1bda8..00f518f60 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,8 +35,6 @@ class ActionGenerator: @defer.inlineCallbacks def handle_event(self, event): - users = yield self.store.get_users_in_room(event['room_id']) - bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event['room_id'], self.hs, self.store ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c0fa72b2..c489bfc8d 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -54,7 +54,7 @@ def evaluator_for_room_id(room_id, hs, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users )) @@ -104,4 +104,4 @@ class BulkPushRuleEvaluator: matches &= PushRuleEvaluator._event_fulfills_condition( event, cond, display_name, room_member_count, profile_tag ) - return matches \ No newline at end of file + return matches From d79e90f078c83314de3dc469770750dd2585e255 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:56:56 +0000 Subject: [PATCH 16/33] Add mocks to make tests work again --- tests/handlers/test_federation.py | 7 +++++++ tests/handlers/test_room.py | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index d392c2301..a4758c03d 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -49,6 +49,10 @@ class FederationTestCase(unittest.TestCase): "get_destination_retry_timings", "set_destination_retry_timings", "have_events", + "get_users_in_room", + "bulk_get_push_rules", + "get_current_state", + "set_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -85,6 +89,9 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) + self.datastore.get_users_in_room.return_value = ["@a:b"] + self.datastore.bulk_get_push_rules.return_value = {} + self.datastore.get_current_state.return_value = {} self.auth.check_host_in_room.return_value = defer.succeed(True) retry_timings_res = { diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 2a7553f98..ba20b3194 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -43,6 +43,10 @@ class RoomMemberHandlerTestCase(unittest.TestCase): "store_room", "get_latest_events_in_room", "add_event_hashes", + "get_users_in_room", + "bulk_get_push_rules", + "get_current_state", + "set_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +94,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.datastore.persist_event.return_value = (1,1) self.datastore.add_event_hashes.return_value = [] + self.datastore.get_users_in_room.return_value = ["@bob:red"] + self.datastore.bulk_get_push_rules.return_value = {} @defer.inlineCallbacks def test_invite(self): @@ -109,6 +115,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() @@ -190,6 +197,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() @@ -265,6 +273,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.datastore.get_latest_events_in_room.return_value = ( defer.succeed([]) ) + self.datastore.get_current_state.return_value = {} def annotate(_): ctx = Mock() From d2a92c6bdeff51136a930ccddda6e734a2a0dc25 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 18:25:04 +0000 Subject: [PATCH 17/33] Fix merge fail with anon access stuff --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b1bfdce85..f63c073a2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -520,11 +520,11 @@ class SyncHandler(BaseHandler): # this users current read receipt. This could almost certainly be # optimised. _, all_ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token + sync_config, now_token, room_ids ) now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token + sync_config, now_token, room_ids, since_token ) rm_handler = self.hs.get_handlers().room_member_handler From 3051c9d002a467643d1ab32bc36974d2e3f84c12 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 13:39:29 +0000 Subject: [PATCH 18/33] Address minor PR issues --- synapse/handlers/_base.py | 4 ++-- synapse/handlers/federation.py | 4 ++-- synapse/push/action_generator.py | 7 +++---- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/event_actions.py | 2 +- synapse/storage/push_rule.py | 6 +++--- synapse/storage/registration.py | 12 ------------ 7 files changed, 12 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 24c4c6269..938eb29de 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,8 +267,8 @@ class BaseHandler(object): event, context=context ) - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0b1221deb..764709b42 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,8 +245,8 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 00f518f60..4ab5d9e1b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -23,8 +23,7 @@ logger = logging.getLogger(__name__) class ActionGenerator: - def __init__(self, hs, store): - self.hs = hs + def __init__(self, store): self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -34,9 +33,9 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_event(self, event): + def handle_push_actions_for_event(self, event): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.hs, self.store + event['room_id'], self.store ) actions_by_user = bulk_evaluator.action_for_event_by_user(event) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c489bfc8d..1c4e54ba4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -33,7 +33,7 @@ def decode_rule_json(rule): @defer.inlineCallbacks -def evaluator_for_room_id(room_id, hs, store): +def evaluator_for_room_id(room_id, store): users = yield store.get_users_in_room(room_id) rules_by_user = yield store.bulk_get_push_rules(users) rules_by_user = { diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 3efa445c1..fa9cbe71e 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9dec4aa68..7c5123d64 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -62,12 +62,12 @@ class PushRuleStore(SQLBaseStore): def f(txn, user_ids_to_fetch): sql = ( "SELECT " + - ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + ",".join("pr."+x for x in PushRuleTable.fields) + " FROM " + PushRuleTable.table_name + " pr " + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + " WHERE pr.user_name " + - " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")" " AND (pre.enabled is null or pre.enabled = 1)" " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" ) @@ -78,7 +78,7 @@ class PushRuleStore(SQLBaseStore): batch_start = 0 while batch_start < len(user_ids): - batch_end = max(len(user_ids), batch_size) + batch_end = min(len(user_ids), batch_size) batch_user_ids = user_ids[batch_start:batch_end] batch_start = batch_end diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 4676f225b..09a05b08e 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -291,18 +291,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(ret['user_id']) defer.returnValue(None) - @defer.inlineCallbacks - def get_all_user_ids(self): - """Returns all user ids registered on this homeserver""" - return self.runInteraction( - "get_all_user_ids", - self._get_all_user_ids_txn - ) - - def _get_all_user_ids_txn(self, txn): - txn.execute("SELECT name from users") - return [r[0] for r in txn.fetchall()] - @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" From c914d67cda9682331639b78190db367974e4fb8b Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:05:37 +0000 Subject: [PATCH 19/33] Rename event-actions to event_push_actions as per PR request --- synapse/handlers/sync.py | 2 +- synapse/push/__init__.py | 2 +- synapse/push/action_generator.py | 2 +- synapse/storage/__init__.py | 4 ++-- ...event_actions.py => event_push_actions.py} | 20 +++++++++---------- ...ent_actions.sql => event_push_actions.sql} | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) rename synapse/storage/{event_actions.py => event_push_actions.py} (84%) rename synapse/storage/schema/delta/27/{event_actions.sql => event_push_actions.sql} (82%) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f63c073a2..64556c5eb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -896,7 +896,7 @@ class SyncHandler(BaseHandler): notifs = [] if last_unread_event_id: - notifs = yield self.store.get_unread_event_actions_by_room_for_user( + notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) else: diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 250f22a16..3ab6da062 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -27,7 +27,7 @@ import random logger = logging.getLogger(__name__) -# Pushers could now be moved to pull out of the event_actions table instead +# Pushers could now be moved to pull out of the event_push_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. class Pusher(object): diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 4ab5d9e1b..5526324a6 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ class ActionGenerator: actions_by_user = bulk_evaluator.action_for_event_by_user(event) - yield self.store.set_actions_for_event_and_users( + yield self.store.set_push_actions_for_event_and_users( event, [ (uid, None, actions) for uid, actions in actions_by_user.items() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a112dd237..43e05f144 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,7 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore -from .event_actions import EventActionsStore +from .event_push_actions import EventPushActionsStore from .state import StateStore from .signatures import SignatureStore @@ -76,7 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventActionsStore + EventPushActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_push_actions.py similarity index 84% rename from synapse/storage/event_actions.py rename to synapse/storage/event_push_actions.py index fa9cbe71e..016c0adf8 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,9 +22,9 @@ import simplejson as json logger = logging.getLogger(__name__) -class EventActionsStore(SQLBaseStore): +class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event_and_users(self, event, tuples): + def set_push_actions_for_event_and_users(self, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -42,15 +42,15 @@ class EventActionsStore(SQLBaseStore): yield self.runInteraction( "set_actions_for_event_and_users", self._simple_insert_many_txn, - EventActionsTable.table_name, + EventPushActionsTable.table_name, values ) @defer.inlineCallbacks - def get_unread_event_actions_by_room_for_user( + def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): - def _get_unread_event_actions_by_room(txn): + def _get_unread_event_push_actions_by_room(txn): sql = ( "SELECT stream_ordering, topological_ordering" " FROM events" @@ -68,7 +68,7 @@ class EventActionsStore(SQLBaseStore): sql = ( "SELECT ea.event_id, ea.actions" - " FROM event_actions ea, events e" + " FROM event_push_actions ea, events e" " WHERE ea.room_id = e.room_id" " AND ea.event_id = e.event_id" " AND ea.user_id = ?" @@ -88,11 +88,11 @@ class EventActionsStore(SQLBaseStore): ] ret = yield self.runInteraction( - "get_unread_event_actions_by_room", - _get_unread_event_actions_by_room + "get_unread_event_push_actions_by_room", + _get_unread_event_push_actions_by_room ) defer.returnValue(ret) -class EventActionsTable(object): - table_name = "event_actions" +class EventPushActionsTable(object): + table_name = "event_push_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_push_actions.sql similarity index 82% rename from synapse/storage/schema/delta/27/event_actions.sql rename to synapse/storage/schema/delta/27/event_push_actions.sql index bbdaee990..bdf6ae3f2 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_push_actions.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS event_actions( +CREATE TABLE IF NOT EXISTS event_push_actions( room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, @@ -23,4 +23,4 @@ CREATE TABLE IF NOT EXISTS event_actions( ); -CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); +CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); From 92a1e74b202757b0f4b577ccbd3e31d8dd4d6460 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:17:35 +0000 Subject: [PATCH 20/33] fix tests --- tests/handlers/test_federation.py | 2 +- tests/handlers/test_room.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index a4758c03d..6acc4ebad 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,7 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_users_in_room", "bulk_get_push_rules", "get_current_state", - "set_actions_for_event_and_users", + "set_push_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index ba20b3194..ff2b59712 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -46,7 +46,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): "get_users_in_room", "bulk_get_push_rules", "get_current_state", - "set_actions_for_event_and_users", + "set_push_actions_for_event_and_users", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), From f1b67730fa085755a7ab459b0239608bd3585a67 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:50:36 +0000 Subject: [PATCH 21/33] Add unread_notif_count in incremental_sync_with_gap --- synapse/handlers/sync.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 64556c5eb..93c48d167 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -749,6 +749,13 @@ class SyncHandler(BaseHandler): if just_joined: state = yield self.get_state_at(room_id, now_token) + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -757,6 +764,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=notif_count, ) logging.debug("Room sync: %r", room_sync) From d74c6ace247c31524dacf795108300e53bdbff55 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 15:32:00 +0000 Subject: [PATCH 22/33] comma --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 93c48d167..2ec42ee50 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -346,7 +346,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), - unread_notification_count=notif_count + unread_notification_count=notif_count, )) def account_data_for_user(self, account_data): From c77e7e60fc5d29e8a57bb82dd5aa8e72ae570d84 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 15:49:06 +0000 Subject: [PATCH 23/33] Only joined rooms have unread_notif_count --- synapse/rest/client/v2_alpha/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cd3aef9e0..095f96e21 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -311,12 +311,12 @@ class SyncRestServlet(RestServlet): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, - "unread_notification_count": room.unread_notification_count } if joined: ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) result["ephemeral"] = {"events": ephemeral_events} + result["unread_notification_count"] = room.unread_notification_count return result From 4eb7b950c829dd8463df8ccd1095772452293a15 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 18:11:17 +0000 Subject: [PATCH 24/33] = not == in sql --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 016c0adf8..3075d0225 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -75,7 +75,7 @@ class EventPushActionsStore(SQLBaseStore): " AND ea.room_id = ?" " AND (" " e.topological_ordering > ?" - " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + " OR (e.topological_ordering = ? AND e.stream_ordering > ?)" ")" ) txn.execute(sql, ( From 85ca8cb90c706ad40034a05282d751887d9bf05f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 5 Jan 2016 13:32:39 +0000 Subject: [PATCH 25/33] comment typo --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c4e54ba4..c00acfd87 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -63,7 +63,7 @@ class BulkPushRuleEvaluator: Runs push rules for all users in a room. This is faster than running PushRuleEvaluator for each user because it fetches all the rules for all the users in one (batched) db query - rarher than doing multiple queries per-user. It currently uses + rather than doing multiple queries per-user. It currently uses the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ From c79f221192044203b9d32cfbd416a7fefeb34cd5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:38:09 +0000 Subject: [PATCH 26/33] Add is_guest flag to users db to track whether a user is a guest user or not. Use this so we can run _filter_events_for_client when calculating event_push_actions. --- synapse/handlers/_base.py | 8 ++-- synapse/handlers/federation.py | 6 +-- synapse/handlers/register.py | 4 +- synapse/push/action_generator.py | 6 +-- synapse/push/bulk_push_rule_evaluator.py | 27 ++++++++++--- synapse/rest/client/v2_alpha/register.py | 5 ++- synapse/storage/event_push_actions.py | 4 +- synapse/storage/registration.py | 40 ++++++++++++++----- .../delta/{27 => 28}/event_push_actions.sql | 0 9 files changed, 69 insertions(+), 31 deletions(-) rename synapse/storage/schema/delta/{27 => 28}/event_push_actions.sql (100%) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 3115a5065..66e35de6e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -23,8 +23,6 @@ from synapse.push.action_generator import ActionGenerator from synapse.util.logcontext import PreserveLoggingContext -from synapse.events.utils import serialize_event - import logging @@ -256,9 +254,9 @@ class BaseHandler(object): ) action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec() - )) + yield action_generator.handle_push_actions_for_event( + event, self + ) destinations = set(extra_destinations) for k, s in context.current_state.items(): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 764709b42..075b9e21c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,7 +32,7 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID -from synapse.events.utils import prune_event, serialize_event +from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination @@ -246,8 +246,8 @@ class FederationHandler(BaseHandler): if not backfilled and not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec()) + yield action_generator.handle_push_actions_for_event( + event, self ) @defer.inlineCallbacks diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6f111ff63..1799a668c 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -84,7 +84,8 @@ class RegistrationHandler(BaseHandler): localpart=None, password=None, generate_token=True, - guest_access_token=None + guest_access_token=None, + make_guest=False ): """Registers a new client on the server. @@ -118,6 +119,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash, was_guest=guest_access_token is not None, + make_guest=make_guest ) yield registered_user(self.distributor, user) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 5526324a6..bcd40798f 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -33,12 +33,12 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event): + def handle_push_actions_for_event(self, event, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.store + event.room_id, self.store ) - actions_by_user = bulk_evaluator.action_for_event_by_user(event) + actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) yield self.store.set_push_actions_for_event_and_users( event, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c00acfd87..63d65b446 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -23,6 +23,8 @@ from synapse.types import UserID import baserules from push_rule_evaluator import PushRuleEvaluator +from synapse.events.utils import serialize_event + logger = logging.getLogger(__name__) @@ -54,7 +56,7 @@ def evaluator_for_room_id(room_id, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users, store )) @@ -67,13 +69,15 @@ class BulkPushRuleEvaluator: the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ - def __init__(self, room_id, rules_by_user, display_names, users_in_room): + def __init__(self, room_id, rules_by_user, display_names, users_in_room, store): self.room_id = room_id self.rules_by_user = rules_by_user self.display_names = display_names self.users_in_room = users_in_room + self.store = store - def action_for_event_by_user(self, event): + @defer.inlineCallbacks + def action_for_event_by_user(self, event, handler): actions_by_user = {} for uid, rules in self.rules_by_user.items(): @@ -81,6 +85,13 @@ class BulkPushRuleEvaluator: if uid in self.display_names: display_name = self.display_names[uid] + is_guest = yield self.store.is_guest(UserID.from_string(uid)) + filtered = yield handler._filter_events_for_client( + uid, [event], is_guest=is_guest + ) + if len(filtered) == 0: + continue + for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -94,14 +105,20 @@ class BulkPushRuleEvaluator: if len(actions) > 0: actions_by_user[uid] = actions break - return actions_by_user + defer.returnValue(actions_by_user) @staticmethod def event_matches_rule(event, rule, display_name, room_member_count, profile_tag): matches = True + + # passing the clock all the way into here is extremely awkward and push + # rules do not care about any of the relative timestamps, so we just + # pass 0 for the current time. + client_event = serialize_event(event, 0) + for cond in rule['conditions']: matches &= PushRuleEvaluator._event_fulfills_condition( - event, cond, display_name, room_member_count, profile_tag + client_event, cond, display_name, room_member_count, profile_tag ) return matches diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 25389cede..c4d025b46 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -259,7 +259,10 @@ class RegisterRestServlet(RestServlet): def _do_guest_registration(self): if not self.hs.config.allow_guest_access: defer.returnValue((403, "Guest access is disabled")) - user_id, _ = yield self.registration_handler.register(generate_token=False) + user_id, _ = yield self.registration_handler.register( + generate_token=False, + make_guest=True + ) access_token = self.auth_handler.generate_access_token(user_id, ["guest = true"]) defer.returnValue((200, { "user_id": user_id, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3075d0225..0634af6b6 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -32,8 +32,8 @@ class EventPushActionsStore(SQLBaseStore): values = [] for uid, profile_tag, actions in tuples: values.append({ - 'room_id': event['room_id'], - 'event_id': event['event_id'], + 'room_id': event.room_id, + 'event_id': event.event_id, 'user_id': uid, 'profile_tag': profile_tag, 'actions': json.dumps(actions) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f0fa0bd33..c79066f77 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(SQLBaseStore): @@ -73,7 +73,8 @@ class RegistrationStore(SQLBaseStore): ) @defer.inlineCallbacks - def register(self, user_id, token, password_hash, was_guest=False): + def register(self, user_id, token, password_hash, + was_guest=False, make_guest=False): """Attempts to register an account. Args: @@ -82,15 +83,18 @@ class RegistrationStore(SQLBaseStore): password_hash (str): Optional. The password hash for this user. was_guest (bool): Optional. Whether this is a guest account being upgraded to a non-guest account. + make_guest (boolean): True if the the new user should be guest, + false to add a regular user account. Raises: StoreError if the user_id could not be registered. """ yield self.runInteraction( "register", - self._register, user_id, token, password_hash, was_guest + self._register, user_id, token, password_hash, was_guest, make_guest ) + self.is_guest.invalidate((user_id,)) - def _register(self, txn, user_id, token, password_hash, was_guest): + def _register(self, txn, user_id, token, password_hash, was_guest, make_guest): now = int(self.clock.time()) next_id = self._access_tokens_id_gen.get_next_txn(txn) @@ -100,12 +104,14 @@ class RegistrationStore(SQLBaseStore): txn.execute("UPDATE users SET" " password_hash = ?," " upgrade_ts = ?" + " is_guest = ?" " WHERE name = ?", - [password_hash, now, user_id]) + [password_hash, now, make_guest, user_id]) else: - txn.execute("INSERT INTO users(name, password_hash, creation_ts) " - "VALUES (?,?,?)", - [user_id, password_hash, now]) + txn.execute("INSERT INTO users " + "(name, password_hash, creation_ts, is_guest) " + "VALUES (?,?,?,?)", + [user_id, password_hash, now, make_guest]) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE @@ -126,7 +132,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash"], + retcols=["name", "password_hash", "is_guest"], allow_none=True, ) @@ -136,7 +142,7 @@ class RegistrationStore(SQLBaseStore): """ def f(txn): sql = ( - "SELECT name, password_hash FROM users" + "SELECT name, password_hash, is_guest FROM users" " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) @@ -249,9 +255,21 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) + @cachedInlineCallbacks() + def is_guest(self, user): + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user.to_string()}, + retcol="is_guest", + allow_none=True, + desc="is_guest", + ) + + defer.returnValue(res if res else False) + def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, access_tokens.id as token_id" + "SELECT users.name, users.is_guest, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" diff --git a/synapse/storage/schema/delta/27/event_push_actions.sql b/synapse/storage/schema/delta/28/event_push_actions.sql similarity index 100% rename from synapse/storage/schema/delta/27/event_push_actions.sql rename to synapse/storage/schema/delta/28/event_push_actions.sql From ae1262a241aa816b9e0f19e628afcc83229af64f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:58:20 +0000 Subject: [PATCH 27/33] Add schema change file for is_guest flag --- .../schema/delta/28/users_is_guest.sql | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 synapse/storage/schema/delta/28/users_is_guest.sql diff --git a/synapse/storage/schema/delta/28/users_is_guest.sql b/synapse/storage/schema/delta/28/users_is_guest.sql new file mode 100644 index 000000000..80792e85d --- /dev/null +++ b/synapse/storage/schema/delta/28/users_is_guest.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD is_guest admin SMALLINT DEFAULT 0 NOT NULL; +/* + * NB: any guest users created between 27 and 28 will be incorrectly + * marked as not guests: we don't bother to fill these in correctly + * because guest access is not really complete in 27 anyway so it's + * very unlikley there will be any guest users created. + */ From 992928304f3c95f87a3297799965159d295432ea Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:58:46 +0000 Subject: [PATCH 28/33] Delete notifications for redacted events --- synapse/push/action_generator.py | 7 +++++++ synapse/storage/event_push_actions.py | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index bcd40798f..4cf94f6c6 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,6 +19,8 @@ import bulk_push_rule_evaluator import logging +from synapse.api.constants import EventTypes + logger = logging.getLogger(__name__) @@ -34,6 +36,11 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, handler): + if event.type == EventTypes.Redaction and event.redacts is not None: + yield self.store.remove_push_actions_for_event_id( + event.room_id, event.redacts + ) + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.store ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 0634af6b6..5b44431ab 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -93,6 +93,18 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def remove_push_actions_for_event_id(self, room_id, event_id): + def f(txn): + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) + ) + yield self.runInteraction( + "remove_push_actions_for_event_id", + f + ) + class EventPushActionsTable(object): table_name = "event_push_actions" From 0e48f7f2458f08341131b3b90c78b7034fe02d14 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 16:46:41 +0000 Subject: [PATCH 29/33] fix tests --- tests/handlers/test_federation.py | 4 ++++ tests/handlers/test_room.py | 5 +++++ tests/storage/test_registration.py | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 6acc4ebad..029c09411 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase): "bulk_get_push_rules", "get_current_state", "set_push_actions_for_event_and_users", + "is_guest", + "get_state_for_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -73,6 +75,8 @@ class FederationTestCase(unittest.TestCase): self.handlers.federation_handler = FederationHandler(self.hs) + self.datastore.get_state_for_events.return_value = {"$a:b": {}} + @defer.inlineCallbacks def test_msg(self): pdu = FrozenEvent({ diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index ff2b59712..b1c8e6152 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -47,6 +47,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): "bulk_get_push_rules", "get_current_state", "set_push_actions_for_event_and_users", + "get_state_for_events", + "is_guest", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -116,6 +118,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() @@ -198,6 +201,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() @@ -274,6 +278,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): defer.succeed([]) ) self.datastore.get_current_state.return_value = {} + self.datastore.get_state_for_events = lambda event_ids,types: {x: {} for x in event_ids} def annotate(_): ctx = Mock() diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 0cce6c37d..4760131f9 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -45,7 +45,7 @@ class RegistrationStoreTestCase(unittest.TestCase): self.assertEquals( # TODO(paul): Surely this field should be 'user_id', not 'name' # Additionally surely it shouldn't come in a 1-element list - {"name": self.user_id, "password_hash": self.pwhash}, + {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0}, (yield self.store.get_user_by_id(self.user_id)) ) From b6a585348ae8a07dc8105242e182435a240e6b8f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 17:16:02 +0000 Subject: [PATCH 30/33] Adding is_guest here won't work because it just constructs a dict of uid -> password hash --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index c79066f77..a52b67013 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -142,7 +142,7 @@ class RegistrationStore(SQLBaseStore): """ def f(txn): sql = ( - "SELECT name, password_hash, is_guest FROM users" + "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) From 09dc9854cd8f28b2e5ce90207cfc822d226ec2ad Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 17:44:10 +0000 Subject: [PATCH 31/33] comma style --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1799a668c..ba26d13d4 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -119,7 +119,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash, was_guest=guest_access_token is not None, - make_guest=make_guest + make_guest=make_guest, ) yield registered_user(self.distributor, user) From 823b679232ed030ba9a2f609d11074c6b3111be2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Jan 2016 10:02:47 +0000 Subject: [PATCH 32/33] more commas --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 63d65b446..ce244fa95 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -42,7 +42,7 @@ def evaluator_for_room_id(room_id, store): uid: baserules.list_with_base_rules( [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] if uid in rules_by_user else [], - UserID.from_string(uid) + UserID.from_string(uid), ) for uid in users } From daadcf36c0bad93220d4fa422dce0c5740f63e3d Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Jan 2016 10:15:35 +0000 Subject: [PATCH 33/33] This comma is actually important --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a52b67013..ece71f2ee 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -103,7 +103,7 @@ class RegistrationStore(SQLBaseStore): if was_guest: txn.execute("UPDATE users SET" " password_hash = ?," - " upgrade_ts = ?" + " upgrade_ts = ?," " is_guest = ?" " WHERE name = ?", [password_hash, now, make_guest, user_id])