From 4063fe0283b09a7a300dd41972629607b917e1e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Apr 2016 10:35:53 +0100 Subject: [PATCH] Update port script --- scripts/synapse_port_db | 125 ++++++++++++++++++++++++++-------------- 1 file changed, 82 insertions(+), 43 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 253a6ef6c..efd04da2d 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -214,6 +214,10 @@ class Porter(object): self.progress.add_table(table, postgres_size, table_size) + if table == "event_search": + yield self.handle_search_table(postgres_size, table_size, next_chunk) + return + select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) @@ -232,51 +236,19 @@ class Porter(object): if rows: next_chunk = rows[-1][0] + 1 - if table == "event_search": - # We have to treat event_search differently since it has a - # different structure in the two different databases. - def insert(txn): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, sender, vector)" - " VALUES (?,?,?,?,to_tsvector('english', ?))" - ) + self._convert_rows(table, headers, rows) - rows_dict = [ - dict(zip(headers, row)) - for row in rows - ] + def insert(txn): + self.postgres_store.insert_many_txn( + txn, table, headers[1:], rows + ) - txn.executemany(sql, [ - ( - row["event_id"], - row["room_id"], - row["key"], - row["sender"], - row["value"], - ) - for row in rows_dict - ]) - - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, - ) - else: - self._convert_rows(table, headers, rows) - - def insert(txn): - self.postgres_store.insert_many_txn( - txn, table, headers[1:], rows - ) - - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, - ) + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": table}, + updatevalues={"rowid": next_chunk}, + ) yield self.postgres_store.execute(insert) @@ -286,6 +258,73 @@ class Porter(object): else: return + @defer.inlineCallbacks + def handle_search_table(self, postgres_size, table_size, next_chunk): + select = ( + "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" + " FROM event_search as es" + " INNER JOIN events AS e USING (event_id, room_id)" + " WHERE es.rowid >= ?" + " ORDER BY es.rowid LIMIT ?" + ) + + while True: + def r(txn): + txn.execute(select, (next_chunk, self.batch_size,)) + rows = txn.fetchall() + headers = [column[0] for column in txn.description] + + return headers, rows + + headers, rows = yield self.sqlite_store.runInteraction("select", r) + + if rows: + next_chunk = rows[-1][0] + 1 + + # We have to treat event_search differently since it has a + # different structure in the two different databases. + def insert(txn): + sql = ( + "INSERT INTO event_search (event_id, room_id, key," + " sender, vector, origin_server_ts, stream_ordering)" + " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)" + ) + + rows_dict = [ + dict(zip(headers, row)) + for row in rows + ] + + txn.executemany(sql, [ + ( + row["event_id"], + row["room_id"], + row["key"], + row["sender"], + row["value"], + row["origin_server_ts"], + row["stream_ordering"], + ) + for row in rows_dict + ]) + + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": "event_search"}, + updatevalues={"rowid": next_chunk}, + ) + + yield self.postgres_store.execute(insert) + + postgres_size += len(rows) + + self.progress.update("event_search", postgres_size) + + else: + return + + def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{