diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 8443e4b05..9a96e6fe8 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -53,7 +53,7 @@ class BulkPushRuleEvaluator(object): room_id = event.room_id rules_for_room = self._get_rules_for_room(room_id) - rules_by_user = yield rules_for_room.get_rules(context) + rules_by_user = yield rules_for_room.get_rules(event, context) # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited @@ -216,7 +216,7 @@ class RulesForRoom(object): self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id) @defer.inlineCallbacks - def get_rules(self, context): + def get_rules(self, event, context): """Given an event context return the rules for all users who are currently in the room. """ @@ -224,6 +224,7 @@ class RulesForRoom(object): with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) defer.returnValue(self.rules_by_user) ret_rules_by_user = {} @@ -236,6 +237,10 @@ class RulesForRoom(object): else: current_state_ids = context.current_state_ids + logger.debug( + "Looking for member changes in %r %r", state_group, current_state_ids + ) + # Loop through to see which member events we've seen and have rules # for and which we need to fetch for key in current_state_ids: @@ -273,15 +278,21 @@ class RulesForRoom(object): if missing_member_event_ids: # If we have some memebr events we haven't seen, look them up # and fetch push rules for them if appropriate. + logger.debug("Found new member events %r", missing_member_event_ids) yield self._update_rules_with_member_event_ids( - ret_rules_by_user, missing_member_event_ids, state_group + ret_rules_by_user, missing_member_event_ids, state_group, event ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Returning push rules for %r %r", + self.room_id, ret_rules_by_user.keys(), + ) defer.returnValue(ret_rules_by_user) @defer.inlineCallbacks def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids, - state_group): + state_group, event): """Update the partially filled rules_by_user dict by fetching rules for any newly joined users in the `member_event_ids` list. @@ -310,11 +321,23 @@ class RulesForRoom(object): for row in rows } + # If the event is a join event then it will be in current state evnts + # map but not in the DB, so we have to explicitly insert it. + if event.type == EventTypes.Member: + for event_id in member_event_ids.itervalues(): + if event_id == event.event_id: + members[event_id] = (event.state_key, event.membership) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Found members %r: %r", self.room_id, members.values()) + interested_in_user_ids = set( user_id for user_id, membership in members.itervalues() if membership == Membership.JOIN ) + logger.debug("Joined: %r", interested_in_user_ids) + if_users_with_pushers = yield self.store.get_if_users_have_pushers( interested_in_user_ids, on_invalidate=self.invalidate_all_cb, @@ -324,10 +347,14 @@ class RulesForRoom(object): uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher ) + logger.debug("With pushers: %r", user_ids) + users_with_receipts = yield self.store.get_users_with_read_receipts_in_room( self.room_id, on_invalidate=self.invalidate_all_cb, ) + logger.debug("With receipts: %r", users_with_receipts) + # any users with pushers must be ours: they have pushers for uid in users_with_receipts: if uid in interested_in_user_ids: @@ -348,6 +375,7 @@ class RulesForRoom(object): # as it keeps a reference to self and will stop this instance from being # GC'd if it gets dropped from the rules_to_user cache. Instead use # `self.invalidate_all_cb` + logger.debug("Invalidating RulesForRoom for %r", self.room_id) self.sequence += 1 self.state_group = object() self.member_map = {}