Convert sending mail to async/await. (#7557)

Mainly because sometimes the email push code raises exceptions where the
stack traces have gotten lost, which is hopefully fixed by this.
This commit is contained in:
Erik Johnston 2020-05-22 13:41:11 +01:00 committed by GitHub
parent 66f2ebc22f
commit 06a02bc1ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 76 deletions

View file

@ -15,7 +15,6 @@
import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
@ -132,8 +131,7 @@ class EmailPusher(object):
self._is_processing = False
self._start_processing()
@defer.inlineCallbacks
def _process(self):
async def _process(self):
# we should never get here if we are already processing
assert not self._is_processing
@ -142,7 +140,7 @@ class EmailPusher(object):
if self.throttle_params is None:
# this is our first loop: load up the throttle params
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.throttle_params = await self.store.get_throttle_params_by_room(
self.pusher_id
)
@ -151,7 +149,7 @@ class EmailPusher(object):
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
await self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
@ -159,8 +157,7 @@ class EmailPusher(object):
finally:
self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
async def _unsafe_process(self):
"""
Main logic of the push loop without the wrapper function that sets
up logging, measures and guards against multiple instances of it
@ -168,12 +165,12 @@ class EmailPusher(object):
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
fn = self.store.get_unread_push_actions_for_user_in_range_for_email
unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
soonest_due_at = None
if not unprocessed:
yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
return
for push_action in unprocessed:
@ -201,15 +198,15 @@ class EmailPusher(object):
"throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
}
yield self.send_notification(unprocessed, reason)
await self.send_notification(unprocessed, reason)
yield self.save_last_stream_ordering_and_success(
await self.save_last_stream_ordering_and_success(
max(ea["stream_ordering"] for ea in unprocessed)
)
# we update the throttle on all the possible unprocessed push actions
for ea in unprocessed:
yield self.sent_notif_update_throttle(ea["room_id"], ea)
await self.sent_notif_update_throttle(ea["room_id"], ea)
break
else:
if soonest_due_at is None or should_notify_at < soonest_due_at:
@ -227,14 +224,13 @@ class EmailPusher(object):
self.seconds_until(soonest_due_at), self.on_timer
)
@defer.inlineCallbacks
def save_last_stream_ordering_and_success(self, last_stream_ordering):
async def save_last_stream_ordering_and_success(self, last_stream_ordering):
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return
self.last_stream_ordering = last_stream_ordering
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
@ -275,13 +271,12 @@ class EmailPusher(object):
may_send_at = last_sent_ts + throttle_ms
return may_send_at
@defer.inlineCallbacks
def sent_notif_update_throttle(self, room_id, notified_push_action):
async def sent_notif_update_throttle(self, room_id, notified_push_action):
# We have sent a notification, so update the throttle accordingly.
# If the event that triggered the notif happened more than
# THROTTLE_RESET_AFTER_MS after the previous one that triggered a
# notif, we release the throttle. Otherwise, the throttle is increased.
time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
notified_push_action["stream_ordering"]
)
@ -310,14 +305,13 @@ class EmailPusher(object):
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
yield self.store.set_throttle_params(
await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
)
@defer.inlineCallbacks
def send_notification(self, push_actions, reason):
async def send_notification(self, push_actions, reason):
logger.info("Sending notif email for user %r", self.user_id)
yield self.mailer.send_notification_mail(
await self.mailer.send_notification_mail(
self.app_id, self.user_id, self.email, push_actions, reason
)