diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 260bdf0ec..f2c181dde 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore): def _do_fetch(self, conn): event_list = [] - try: - while True: + while True: + try: logger.debug("do_fetch getting lock") with self._event_fetch_lock: logger.debug("do_fetch go lock: %r", self._event_fetch_list) @@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore): except: logger.exception("Failed to callback") reactor.callFromThread(fire, event_list) - except Exception as e: - logger.exception("do_fetch") + except Exception as e: + logger.exception("do_fetch") - def fire(evs): - for _, d in evs: - if not d.called: - d.errback(e) + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) - if event_list: - reactor.callFromThread(fire, event_list) + if event_list: + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore): defer.returnValue({}) events_d = defer.Deferred() - try: - logger.debug("enqueueueueue getting lock") - with self._event_fetch_lock: - logger.debug("enqueue go lock") - self._event_fetch_list.append( - (events, events_d) - ) + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) + if self._event_fetch_ongoing < 1: self._event_fetch_ongoing += 1 + should_start = True + else: + should_start = False + if should_start: self.runWithConnection( self._do_fetch ) - except Exception as e: - if not events_d.called: - events_d.errback(e) - logger.debug("events_d before") - try: - rows = yield events_d - except: - logger.exception("events_d") + rows = yield events_d logger.debug("events_d after") res = yield defer.gatherResults(