This commit is contained in:
Erik Johnston 2019-06-07 12:10:23 +01:00
parent 8182a1cfb5
commit 2ebeda48b2
3 changed files with 70 additions and 8 deletions

View file

@ -114,6 +114,21 @@ class EmailPusher(object):
run_as_background_process("emailpush.process", self._process)
def _pause_processing(self):
"""Used by tests to temporarily pause processing of events.
Asserts that its not currently processing.
"""
assert not self._is_processing
self._is_processing = True
def _resume_processing(self):
"""Used by tests to resume processing of events after pausing.
"""
assert self._is_processing
self._is_processing = False
self._start_processing()
@defer.inlineCallbacks
def _process(self):
# we should never get here if we are already processing
@ -215,6 +230,10 @@ class EmailPusher(object):
@defer.inlineCallbacks
def save_last_stream_ordering_and_success(self, last_stream_ordering):
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return
self.last_stream_ordering = last_stream_ordering
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.email, self.user_id,