Count and loop

This commit is contained in:
Erik Johnston 2015-05-14 15:40:21 +01:00
parent 96c5b9f87c
commit 142934084a
2 changed files with 32 additions and 34 deletions

View File

@ -301,7 +301,7 @@ class SQLBaseStore(object):
self._event_fetch_lock = threading.Lock() self._event_fetch_lock = threading.Lock()
self._event_fetch_list = [] self._event_fetch_list = []
self._event_fetch_ongoing = False self._event_fetch_ongoing = 0
self.database_engine = hs.database_engine self.database_engine = hs.database_engine

View File

@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore):
def do_fetch(txn): def do_fetch(txn):
event_list = [] event_list = []
try: while True:
with self._event_fetch_lock: try:
event_list = self._event_fetch_list with self._event_fetch_lock:
self._event_fetch_list = [] event_list = self._event_fetch_list
self._event_fetch_list = []
if not event_list: if not event_list:
return return
event_id_lists = zip(*event_list)[0] event_id_lists = zip(*event_list)[0]
event_ids = [ event_ids = [
item for sublist in event_id_lists for item in sublist item for sublist in event_id_lists for item in sublist
] ]
rows = self._fetch_event_rows(txn, event_ids) rows = self._fetch_event_rows(txn, event_ids)
row_dict = { row_dict = {
r["event_id"]: r r["event_id"]: r
for r in rows for r in rows
} }
for ids, d in event_list: for ids, d in event_list:
d.callback( d.callback(
[ [
row_dict[i] for i in ids row_dict[i] for i in ids
if i in row_dict if i in row_dict
] ]
) )
except Exception as e: except Exception as e:
for _, d in event_list: for _, d in event_list:
try: try:
reactor.callFromThread(d.errback, e) reactor.callFromThread(d.errback, e)
except: except:
pass pass
finally:
with self._event_fetch_lock:
self._event_fetch_ongoing = False
def cb(rows): def cb(rows):
return defer.gatherResults([ return defer.gatherResults([
@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore):
(events, d) (events, d)
) )
if not self._event_fetch_ongoing: if self._event_fetch_ongoing < 3:
self._event_fetch_ongoing += 1
self.runInteraction( self.runInteraction(
"do_fetch", "do_fetch",
do_fetch do_fetch
) )
self._event_fetch_ongoing = True
res = yield d res = yield d