Catch any exceptions in the pusher loop. Use a lower timeout for pushers so we can see if they're actually still running.

This commit is contained in:
David Baker 2015-06-05 11:40:22 +01:00
parent da84946de4
commit b8690dd840

View File

@ -24,6 +24,7 @@ import baserules
import logging import logging
import simplejson as json import simplejson as json
import re import re
import random
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -256,134 +257,154 @@ class Pusher(object):
logger.info("Pusher %s for user %s starting from token %s", logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token) self.pushkey, self.user_name, self.last_token)
wait = 0
while self.alive: while self.alive:
from_tok = StreamToken.from_string(self.last_token) try:
config = PaginationConfig(from_token=from_tok, limit='1') if wait > 0:
chunk = yield self.evStreamHandler.get_stream( yield synapse.util.async.sleep(wait)
self.user_name, config, yield self.get_and_dispatch()
timeout=100*365*24*60*60*1000, affect_presence=False wait = 0
) except:
if wait == 0:
# limiting to 1 may get 1 event plus 1 presence event, so wait = 1
# pick out the actual event else:
single_event = None wait = min(wait * 2, 1800)
for c in chunk['chunk']: logger.exception(
if 'event_id' in c: # Hmmm... "Exception in pusher loop for pushkey %s. Pausing for %ds",
single_event = c self.pushkey, wait
break
if not single_event:
self.last_token = chunk['end']
continue
if not self.alive:
continue
processed = False
actions = yield self._actions_for_event(single_event)
tweaks = _tweaks_for_actions(actions)
if len(actions) == 0:
logger.warn("Empty actions! Using default action.")
actions = Pusher.DEFAULT_ACTIONS
if 'notify' not in actions and 'dont_notify' not in actions:
logger.warn("Neither notify nor dont_notify in actions: adding default")
actions.extend(Pusher.DEFAULT_ACTIONS)
if 'dont_notify' in actions:
logger.debug(
"%s for %s: dont_notify",
single_event['event_id'], self.user_name
) )
@defer.inlineCallbacks
def get_and_dispatch(self):
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
chunk = yield self.evStreamHandler.get_stream(
self.user_name, config,
timeout=timeout, affect_presence=False
)
# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
break
if not single_event:
self.last_token = chunk['end']
logger.debug("Event stream timeout for pushkey %s", self.pushkey)
return
if not self.alive:
return
processed = False
actions = yield self._actions_for_event(single_event)
tweaks = _tweaks_for_actions(actions)
if len(actions) == 0:
logger.warn("Empty actions! Using default action.")
actions = Pusher.DEFAULT_ACTIONS
if 'notify' not in actions and 'dont_notify' not in actions:
logger.warn("Neither notify nor dont_notify in actions: adding default")
actions.extend(Pusher.DEFAULT_ACTIONS)
if 'dont_notify' in actions:
logger.debug(
"%s for %s: dont_notify",
single_event['event_id'], self.user_name
)
processed = True
else:
rejected = yield self.dispatch_push(single_event, tweaks)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True processed = True
else: for pk in rejected:
rejected = yield self.dispatch_push(single_event, tweaks) if pk != self.pushkey:
self.has_unread = True # for sanity, we only remove the pushkey if it
if isinstance(rejected, list) or isinstance(rejected, tuple): # was the one we actually sent...
processed = True logger.warn(
for pk in rejected: ("Ignoring rejected pushkey %s because we"
if pk != self.pushkey: " didn't send it"), pk
# for sanity, we only remove the pushkey if it )
# was the one we actually sent... else:
logger.warn( logger.info(
("Ignoring rejected pushkey %s because we" "Pushkey %s was rejected: removing",
" didn't send it"), pk pk
) )
else: yield self.hs.get_pusherpool().remove_pusher(
logger.info( self.app_id, pk, self.user_name
"Pushkey %s was rejected: removing", )
pk
)
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_name
)
if not self.alive: if not self.alive:
continue return
if processed: if processed:
self.backoff_delay = Pusher.INITIAL_BACKOFF self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end'] self.last_token = chunk['end']
self.store.update_pusher_last_token_and_success( self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
self.user_name,
self.last_token,
self.clock.time_msec()
)
if self.failing_since:
self.failing_since = None
self.store.update_pusher_failing_since(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name, self.user_name,
self.last_token, self.failing_since)
self.clock.time_msec() else:
if not self.failing_since:
self.failing_since = self.clock.time_msec()
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_name,
self.failing_since
)
if (self.failing_since and
self.failing_since <
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
self.user_name,
self.last_token
)
self.failing_since = None
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_name,
self.failing_since
) )
if self.failing_since:
self.failing_since = None
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_name,
self.failing_since)
else: else:
if not self.failing_since: logger.warn("Failed to dispatch push for user %s "
self.failing_since = self.clock.time_msec() "(failing for %dms)."
self.store.update_pusher_failing_since( "Trying again in %dms",
self.app_id, self.user_name,
self.pushkey, self.clock.time_msec() - self.failing_since,
self.user_name, self.backoff_delay)
self.failing_since yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
) self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
if (self.failing_since and self.backoff_delay = Pusher.MAX_BACKOFF
self.failing_since <
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
self.user_name,
self.last_token
)
self.failing_since = None
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_name,
self.failing_since
)
else:
logger.warn("Failed to dispatch push for user %s "
"(failing for %dms)."
"Trying again in %dms",
self.user_name,
self.clock.time_msec() - self.failing_since,
self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
def stop(self): def stop(self):
self.alive = False self.alive = False