diff --git a/changelog.d/4189.misc b/changelog.d/4189.misc new file mode 100644 index 000000000..4a41357d9 --- /dev/null +++ b/changelog.d/4189.misc @@ -0,0 +1,2 @@ +Run the AS senders as background processes to fix warnings + diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 243081479..685f15c06 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -53,8 +53,8 @@ import logging from twisted.internet import defer from synapse.appservice import ApplicationServiceState +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import run_in_background -from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -104,14 +104,23 @@ class _ServiceQueuer(object): self.clock = clock def enqueue(self, service, event): - # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - run_in_background(self._send_request, service) + + # start a sender for this appservice if we don't already have one + + if service.id in self.requests_in_flight: + return + + run_as_background_process( + "as-sender-%s" % (service.id, ), + self._send_request, service, + ) @defer.inlineCallbacks def _send_request(self, service): - if service.id in self.requests_in_flight: - return + # sanity-check: we shouldn't get here if this service already has a sender + # running. + assert(service.id not in self.requests_in_flight) self.requests_in_flight.add(service.id) try: @@ -119,12 +128,10 @@ class _ServiceQueuer(object): events = self.queued_events.pop(service.id, []) if not events: return - - with Measure(self.clock, "servicequeuer.send"): - try: - yield self.txn_ctrl.send(service, events) - except Exception: - logger.exception("AS request failed") + try: + yield self.txn_ctrl.send(service, events) + except Exception: + logger.exception("AS request failed") finally: self.requests_in_flight.discard(service.id) @@ -223,7 +230,12 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later((2 ** self.backoff_counter), self.retry) + def _retry(): + run_as_background_process( + "as-recoverer-%s" % (self.service.id,), + self.retry, + ) + self.clock.call_later((2 ** self.backoff_counter), _retry) def _backoff(self): # cap the backoff to be around 8.5min => (2^9) = 512 secs