mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-14 01:39:32 -05:00
Sort out error handling
This commit is contained in:
parent
a2c4f3f150
commit
de01438a57
@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore):
|
|||||||
|
|
||||||
def _do_fetch(self, conn):
|
def _do_fetch(self, conn):
|
||||||
event_list = []
|
event_list = []
|
||||||
try:
|
while True:
|
||||||
while True:
|
try:
|
||||||
logger.debug("do_fetch getting lock")
|
logger.debug("do_fetch getting lock")
|
||||||
with self._event_fetch_lock:
|
with self._event_fetch_lock:
|
||||||
logger.debug("do_fetch go lock: %r", self._event_fetch_list)
|
logger.debug("do_fetch go lock: %r", self._event_fetch_list)
|
||||||
@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore):
|
|||||||
except:
|
except:
|
||||||
logger.exception("Failed to callback")
|
logger.exception("Failed to callback")
|
||||||
reactor.callFromThread(fire, event_list)
|
reactor.callFromThread(fire, event_list)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception("do_fetch")
|
logger.exception("do_fetch")
|
||||||
|
|
||||||
def fire(evs):
|
def fire(evs):
|
||||||
for _, d in evs:
|
for _, d in evs:
|
||||||
if not d.called:
|
if not d.called:
|
||||||
d.errback(e)
|
d.errback(e)
|
||||||
|
|
||||||
if event_list:
|
if event_list:
|
||||||
reactor.callFromThread(fire, event_list)
|
reactor.callFromThread(fire, event_list)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _enqueue_events(self, events, check_redacted=True,
|
def _enqueue_events(self, events, check_redacted=True,
|
||||||
@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore):
|
|||||||
defer.returnValue({})
|
defer.returnValue({})
|
||||||
|
|
||||||
events_d = defer.Deferred()
|
events_d = defer.Deferred()
|
||||||
try:
|
logger.debug("enqueueueueue getting lock")
|
||||||
logger.debug("enqueueueueue getting lock")
|
with self._event_fetch_lock:
|
||||||
with self._event_fetch_lock:
|
logger.debug("enqueue go lock")
|
||||||
logger.debug("enqueue go lock")
|
self._event_fetch_list.append(
|
||||||
self._event_fetch_list.append(
|
(events, events_d)
|
||||||
(events, events_d)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
|
if self._event_fetch_ongoing < 1:
|
||||||
self._event_fetch_ongoing += 1
|
self._event_fetch_ongoing += 1
|
||||||
|
should_start = True
|
||||||
|
else:
|
||||||
|
should_start = False
|
||||||
|
|
||||||
|
if should_start:
|
||||||
self.runWithConnection(
|
self.runWithConnection(
|
||||||
self._do_fetch
|
self._do_fetch
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
if not events_d.called:
|
|
||||||
events_d.errback(e)
|
|
||||||
|
|
||||||
logger.debug("events_d before")
|
logger.debug("events_d before")
|
||||||
try:
|
rows = yield events_d
|
||||||
rows = yield events_d
|
|
||||||
except:
|
|
||||||
logger.exception("events_d")
|
|
||||||
logger.debug("events_d after")
|
logger.debug("events_d after")
|
||||||
|
|
||||||
res = yield defer.gatherResults(
|
res = yield defer.gatherResults(
|
||||||
|
Loading…
Reference in New Issue
Block a user