diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c77afe7f5..9c92ea01e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias, Requester from synapse.push.action_generator import ActionGenerator -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn import logging @@ -377,6 +377,12 @@ class BaseHandler(object): event, context=context ) + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + destinations = set() for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 026ebe52b..fc5e0b059 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -1094,6 +1094,12 @@ class FederationHandler(BaseHandler): context=context, ) + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 76d7eb7ce..7f94591dc 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -70,11 +70,17 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks def evaluator_for_room_id(room_id, hs, store): - results = yield store.get_receipts_for_room(room_id, "m.read") - user_ids = [ - row["user_id"] for row in results - if hs.is_mine_id(row["user_id"]) - ] + users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) + receipts = yield store.get_receipts_for_room(room_id, "m.read") + + # any users with pushers must be ours: they have pushers + user_ids = set(users_with_pushers) + for r in receipts: + if hs.is_mine_id(r['user_id']): + user_ids.add(r['user_id']) + + user_ids = list(user_ids) + rules_by_user = yield _get_rules(room_id, user_ids, store) defer.returnValue(BulkPushRuleEvaluator( @@ -101,10 +107,15 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, handler, current_state): actions_by_user = {} - users_dict = yield self.store.are_guests(self.rules_by_user.keys()) + # None of these users can be peeking since this list of users comes + # from the set of users in the room, so we know for sure they're all + # actually in the room. + user_tuples = [ + (u, False) for u in self.rules_by_user.keys() + ] filtered_by_user = yield handler.filter_events_for_clients( - users_dict.items(), [event], {event.event_id: current_state} + user_tuples, [event], {event.event_id: current_state} ) room_members = yield self.store.get_users_in_room(self.room_id) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9be486936..d69588564 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,60 +13,188 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.push import Pusher, PusherConfigException +from synapse.push import PusherConfigException -from twisted.internet import defer +from twisted.internet import defer, reactor import logging +import push_rule_evaluator +import push_tools logger = logging.getLogger(__name__) -class HttpPusher(Pusher): - def __init__(self, _hs, user_id, app_id, - app_display_name, device_display_name, pushkey, pushkey_ts, - data, last_token, last_success, failing_since): - super(HttpPusher, self).__init__( - _hs, - user_id, - app_id, - app_display_name, - device_display_name, - pushkey, - pushkey_ts, - data, - last_token, - last_success, - failing_since +class HttpPusher(object): + INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes + MAX_BACKOFF_SEC = 60 * 60 + + # This one's in ms because we compare it against the clock + GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000 + + def __init__(self, hs, pusherdict): + self.hs = hs + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.user_id = pusherdict['user_name'] + self.app_id = pusherdict['app_id'] + self.app_display_name = pusherdict['app_display_name'] + self.device_display_name = pusherdict['device_display_name'] + self.pushkey = pusherdict['pushkey'] + self.pushkey_ts = pusherdict['ts'] + self.data = pusherdict['data'] + self.last_stream_ordering = pusherdict['last_stream_ordering'] + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.failing_since = pusherdict['failing_since'] + self.timed_call = None + + # This is the highest stream ordering we know it's safe to process. + # When new events arrive, we'll be given a window of new events: we + # should honour this rather than just looking for anything higher + # because of potential out-of-order event serialisation. This starts + # off as None though as we don't know any better. + self.max_stream_ordering = None + + if 'data' not in pusherdict: + raise PusherConfigException( + "No 'data' key for HTTP pusher" + ) + self.data = pusherdict['data'] + + self.name = "%s/%s/%s" % ( + pusherdict['user_name'], + pusherdict['app_id'], + pusherdict['pushkey'], ) - if 'url' not in data: + + if 'url' not in self.data: raise PusherConfigException( "'url' required in data for HTTP pusher" ) - self.url = data['url'] - self.http_client = _hs.get_simple_http_client() + self.url = self.data['url'] + self.http_client = hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url['url'] + def on_started(self): + self._process() + + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + self.max_stream_ordering = max_stream_ordering + self._process() + + def on_timer(self): + self._process() + + def on_stop(self): + if self.timed_call: + self.timed_call.cancel() + + @defer.inlineCallbacks + def _process(self): + unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + self.user_id, self.last_stream_ordering, self.max_stream_ordering + ) + + for push_action in unprocessed: + processed = yield self._process_one(push_action) + if processed: + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.last_stream_ordering = push_action['stream_ordering'] + self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, self.pushkey, self.user_id, + self.last_stream_ordering, + self.clock.time_msec() + ) + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) + else: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) + + if ( + self.failing_since and + self.failing_since < + self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER + ): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_id, self.pushkey) + self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC + self.last_stream_ordering = push_action['stream_ordering'] + yield self.store.update_pusher_last_stream_ordering( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering + ) + + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + else: + logger.info("Push failed: delaying for %ds", self.backoff_delay) + self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) + self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC) + break + + @defer.inlineCallbacks + def _process_one(self, push_action): + if 'notify' not in push_action['actions']: + defer.returnValue(True) + + tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions']) + badge = yield push_tools.get_badge_count(self.hs, self.user_id) + + event = yield self.store.get_event(push_action['event_id'], allow_none=True) + if event is None: + defer.returnValue(True) # It's been redacted + rejected = yield self.dispatch_push(event, tweaks, badge) + if rejected is False: + defer.returnValue(False) + + if isinstance(rejected, list) or isinstance(rejected, tuple): + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_id + ) + defer.returnValue(True) + @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): - # we probably do not want to push for every presence update - # (we may want to be able to set up notifications when specific - # people sign in, but we'd want to only deliver the pertinent ones) - # Actually, presence events will not get this far now because we - # need to filter them out in the main Pusher code. - if 'event_id' not in event: - defer.returnValue(None) - - ctx = yield self.get_context_for_event(event) + ctx = yield push_tools.get_context_for_event(self.hs, event) d = { 'notification': { - 'id': event['event_id'], - 'room_id': event['room_id'], - 'type': event['type'], - 'sender': event['user_id'], + 'id': event.event_id, + 'room_id': event.room_id, + 'type': event.type, + 'sender': event.user_id, 'counts': { # -- we don't mark messages as read yet so # we have no way of knowing # Just set the badge to 1 until we have read receipts @@ -84,11 +212,11 @@ class HttpPusher(Pusher): ] } } - if event['type'] == 'm.room.member': - d['notification']['membership'] = event['content']['membership'] - d['notification']['user_is_target'] = event['state_key'] == self.user_id + if event.type == 'm.room.member': + d['notification']['membership'] = event.content['membership'] + d['notification']['user_is_target'] = event.state_key == self.user_id if 'content' in event: - d['notification']['content'] = event['content'] + d['notification']['content'] = event.content if len(ctx['aliases']): d['notification']['room_alias'] = ctx['aliases'][0] diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py new file mode 100644 index 000000000..e1e61e49e --- /dev/null +++ b/synapse/push/push_tools.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +@defer.inlineCallbacks +def get_badge_count(hs, user_id): + invites, joins = yield defer.gatherResults([ + hs.get_datastore().get_invited_rooms_for_user(user_id), + hs.get_datastore().get_rooms_for_user(user_id), + ], consumeErrors=True) + + my_receipts_by_room = yield hs.get_datastore().get_receipts_for_user( + user_id, "m.read", + ) + + badge = len(invites) + + for r in joins: + if r.room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[r.room_id] + + notifs = yield ( + hs.get_datastore().get_unread_event_push_actions_by_room_for_user( + r.room_id, user_id, last_unread_event_id + ) + ) + badge += notifs["notify_count"] + defer.returnValue(badge) + + +@defer.inlineCallbacks +def get_context_for_event(hs, ev): + name_aliases = yield hs.get_datastore().get_room_name_and_aliases( + ev.room_id + ) + + ctx = {'aliases': name_aliases[1]} + if name_aliases[0] is not None: + ctx['name'] = name_aliases[0] + + their_member_events_for_room = yield hs.get_datastore().get_current_state( + room_id=ev.room_id, + event_type='m.room.member', + state_key=ev.user_id + ) + for mev in their_member_events_for_room: + if mev.content['membership'] == 'join' and 'displayname' in mev.content: + dn = mev.content['displayname'] + if dn is not None: + ctx['sender_display_name'] = dn + + defer.returnValue(ctx) diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py new file mode 100644 index 000000000..496083750 --- /dev/null +++ b/synapse/push/pusher.py @@ -0,0 +1,10 @@ +from httppusher import HttpPusher + +PUSHER_TYPES = { + 'http': HttpPusher +} + + +def create_pusher(hs, pusherdict): + if pusherdict['kind'] in PUSHER_TYPES: + return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0b463c6fd..b67ad455e 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,9 +16,10 @@ from twisted.internet import defer -from .httppusher import HttpPusher +import pusher from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn +from synapse.util.async import run_on_reactor import logging @@ -48,7 +49,7 @@ class PusherPool: # will then get pulled out of the database, # recreated, added and started: this means we have only one # code path adding pushers. - self._create_pusher({ + pusher.create_pusher(self.hs, { "user_name": user_id, "kind": kind, "app_id": app_id, @@ -58,10 +59,18 @@ class PusherPool: "ts": time_now_msec, "lang": lang, "data": data, - "last_token": None, + "last_stream_ordering": None, "last_success": None, "failing_since": None }) + + # create the pusher setting last_stream_ordering to the current maximum + # stream ordering in event_push_actions, so it will process + # pushes from this point onwards. + last_stream_ordering = ( + yield self.store.get_latest_push_action_stream_ordering() + ) + yield self.store.add_pusher( user_id=user_id, access_token=access_token, @@ -73,6 +82,7 @@ class PusherPool: pushkey_ts=time_now_msec, lang=lang, data=data, + last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) yield self._refresh_pusher(app_id, pushkey, user_id) @@ -106,26 +116,19 @@ class PusherPool: ) yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) - def _create_pusher(self, pusherdict): - if pusherdict['kind'] == 'http': - return HttpPusher( - self.hs, - user_id=pusherdict['user_name'], - app_id=pusherdict['app_id'], - app_display_name=pusherdict['app_display_name'], - device_display_name=pusherdict['device_display_name'], - pushkey=pusherdict['pushkey'], - pushkey_ts=pusherdict['ts'], - data=pusherdict['data'], - last_token=pusherdict['last_token'], - last_success=pusherdict['last_success'], - failing_since=pusherdict['failing_since'] - ) - else: - raise PusherConfigException( - "Unknown pusher type '%s' for user %s" % - (pusherdict['kind'], pusherdict['user_name']) + @defer.inlineCallbacks + def on_new_notifications(self, min_stream_id, max_stream_id): + yield run_on_reactor() + try: + users_affected = yield self.store.get_push_action_users_in_range( + min_stream_id, max_stream_id ) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + p.on_new_notifications(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_notifications") @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): @@ -146,30 +149,34 @@ class PusherPool: logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: - p = self._create_pusher(pusherdict) + p = pusher.create_pusher(self.hs, pusherdict) except PusherConfigException: logger.exception("Couldn't start a pusher: caught PusherConfigException") continue if p: - fullid = "%s:%s:%s" % ( + appid_pushkey = "%s:%s" % ( pusherdict['app_id'], pusherdict['pushkey'], - pusherdict['user_name'] ) - if fullid in self.pushers: - self.pushers[fullid].stop() - self.pushers[fullid] = p - preserve_fn(p.start)() + byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p + preserve_fn(p.on_started)() logger.info("Started pushers") @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): - fullid = "%s:%s:%s" % (app_id, pushkey, user_id) - if fullid in self.pushers: - logger.info("Stopping pusher %s", fullid) - self.pushers[fullid].stop() - del self.pushers[fullid] + appid_pushkey = "%s:%s" % (app_id, pushkey) + + byuser = self.pushers.get(user_id, {}) + + if appid_pushkey in byuser: + logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) + byuser[appid_pushkey].on_stop() + del byuser[appid_pushkey] yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3933b6e2c..5f61743e3 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): + def f(txn): + sql = ( + "SELECT DISTINCT(user_id) FROM event_push_actions WHERE" + " stream_ordering >= ? AND stream_ordering >= ?" + ) + txn.execute(sql, (min_stream_ordering, max_stream_ordering)) + return [r[0] for r in txn.fetchall()] + ret = yield self.runInteraction("get_push_action_users_in_range", f) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range(self, user_id, + min_stream_ordering, + max_stream_ordering=None): + def f(txn): + sql = ( + "SELECT event_id, stream_ordering, actions" + " FROM event_push_actions" + " WHERE user_id = ? AND stream_ordering > ?" + ) + args = [user_id, min_stream_ordering] + if max_stream_ordering is not None: + sql += " AND stream_ordering <= ?" + args.append(max_stream_ordering) + sql += " ORDER BY stream_ordering ASC" + txn.execute(sql, args) + return txn.fetchall() + ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f) + defer.returnValue([ + { + "event_id": row[0], + "stream_ordering": row[1], + "actions": json.loads(row[2]), + } for row in ret + ]) + + @defer.inlineCallbacks + def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + result = yield self.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + defer.returnValue(result[0] or 0) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d299a113..ceae8715c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): + """ + Write events to the database + Args: + events_and_contexts: list of tuples of (event, context) + backfilled: ? + + Returns: Tuple of stream_orderings where the first is the minimum and + last is the maximum stream ordering assigned to the events when + persisting. + + """ if not events_and_contexts: return @@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d1669c778..f7886dd1b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks + import logging import simplejson as json import types @@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(num_args=1) + def get_users_with_pushers_in_room(self, room_id): + users = yield self.get_users_in_room(room_id) + + result = yield self._simple_select_many_batch( + 'pushers', 'user_name', users, ['user_name'] + ) + + defer.returnValue([r['user_name'] for r in result]) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): - with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + with self._pushers_id_gen.get_next() as stream_id: + return self._simple_upsert_txn( + txn, + "pushers", + dict( + app_id=app_id, + pushkey=pushkey, + user_name=user_id, + ), + dict( + access_token=access_token, + kind=kind, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=encode_canonical_json(data), + last_stream_ordering=last_stream_ordering, + profile_tag=profile_tag, + id=stream_id, + ), + ) + defer.returnValue((yield self.runInteraction("add_pusher", f))) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): @@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore): ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d46a963bb..701dd2f65 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) - @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, - inlineCallbacks=True) - def are_guests(self, user_ids): - sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( - ",".join("?" for _ in user_ids), - ) - - rows = yield self._execute( - "are_guests", self.cursor_to_dict, sql, *user_ids - ) - - result = {user_id: False for user_id in user_ids} - - result.update({ - row["name"]: bool(row["is_guest"]) - for row in rows - }) - - defer.returnValue(result) - def _query_for_auth(self, txn, token): sql = ( "SELECT users.name, users.is_guest, access_tokens.id as token_id" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3..22a690aa8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py new file mode 100644 index 000000000..7e0e385fb --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers.py @@ -0,0 +1,75 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token): + return int(token[1:].split('_')[0]) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table, delta 31...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count)