diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index f9954df39..74e3a7056 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -19,6 +19,7 @@ import logging from synapse.util.metrics import Measure from synapse.util.async import run_on_reactor +from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -56,6 +57,8 @@ class EmailPusher(object): # See httppusher self.max_stream_ordering = None + self.processing = False + @defer.inlineCallbacks def on_started(self): self.throttle_params = yield self.store.get_throttle_params_by_room( @@ -63,20 +66,48 @@ class EmailPusher(object): ) yield self._process() + def on_stop(self): + if self.timed_call: + self.timed_call.cancel() + @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): - with Measure(self.clock, "push.on_new_notifications"): - self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + yield self._process() @defer.inlineCallbacks def on_timer(self): self.timed_call = None - with Measure(self.clock, "push.on_timer"): - yield self._process() + yield self._process() @defer.inlineCallbacks def _process(self): + if self.processing: + return + + with LoggingContext("emailpush._process"): + with Measure(self.clock, "emailpush._process"): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False + + def _unsafe_process(self): + """ + Main logic of the push loop without the wrapper function that sets + up logging, measures and guards against multiple instances of it + being run. + """ last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user( self.user_id ) @@ -118,9 +149,9 @@ class EmailPusher(object): if soonest_due_at is None or should_notify_at < soonest_due_at: soonest_due_at = should_notify_at - if self.timed_call is not None: - self.timed_call.cancel() - self.timed_call = None + if self.timed_call is not None: + self.timed_call.cancel() + self.timed_call = None if soonest_due_at is not None: self.timed_call = reactor.callLater(