Fix background reindex of origin_server_ts

The storage function `_get_events_txn` was removed everywhere except
from this background reindex. The function was removed due to it being
(almost) completely unused while also being large and complex.
Therefore, instead of resurrecting `_get_events_txn` we manually
reimplement the bits that are needed directly.
This commit is contained in:
Erik Johnston 2016-09-27 09:43:27 +01:00
parent a38d46249e
commit 9040c9ffa1

View File

@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore):
min_stream_id = rows[-1][0] min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows] event_ids = [row[1] for row in rows]
events = self._get_events_txn(txn, event_ids) rows_to_update = []
rows = [] chunks = [
for event in events: event_ids[i:i + 100]
try: for i in xrange(0, len(event_ids), 100)
event_id = event.event_id ]
origin_server_ts = event.origin_server_ts for chunk in chunks:
except (KeyError, AttributeError): ev_rows = self._simple_select_many_txn(
# If the event is missing a necessary field then txn,
# skip over it. table="event_json",
continue column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)
rows.append((origin_server_ts, event_id)) for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
rows_to_update.append((origin_server_ts, event_id))
sql = ( sql = (
"UPDATE events SET origin_server_ts = ? WHERE event_id = ?" "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
) )
for index in range(0, len(rows), INSERT_CLUMP_SIZE): for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows[index:index + INSERT_CLUMP_SIZE] clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump) txn.executemany(sql, clump)
progress = { progress = {
"target_min_stream_id_inclusive": target_min_stream_id, "target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id, "max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows) "rows_inserted": rows_inserted + len(rows_to_update)
} }
self._background_update_progress_txn( self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
) )
return len(rows) return len(rows_to_update)
result = yield self.runInteraction( result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn