diff --git a/scripts/port_to_maria.py b/scripts/port_to_maria.py index 6e0adf703..b14cca891 100644 --- a/scripts/port_to_maria.py +++ b/scripts/port_to_maria.py @@ -51,16 +51,52 @@ UNICODE_COLUMNS = { } +APPEND_ONLY_TABLES = [ + "event_content_hashes", + "event_reference_hashes", + "event_signatures", + "event_edge_hashes", + "events", + "event_json", + "state_events", + "room_memberships", + "feedback", + "topics", + "room_names", + "rooms", + "local_media_repository", + "local_media_repository_thumbnails", + "remote_media_cache", + "remote_media_cache_thumbnails", + "redactions", + "event_edges", + "event_auth", + "received_transactions", + "sent_transactions", + "transaction_id_to_pdu", + "users", + "state_groups", + "state_groups_state", + "event_to_state_groups", + "rejections", +] + + class Store(object): def __init__(self, db_pool, engine): self.db_pool = db_pool - self.engine = engine + self.database_engine = engine _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] _simple_insert = SQLBaseStore.__dict__["_simple_insert"] _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"] + _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"] + _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"] + + _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"] + _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"] _execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"] @@ -73,11 +109,11 @@ class Store(object): try: txn = conn.cursor() return func( - LoggingTransaction(txn, desc, self.engine), + LoggingTransaction(txn, desc, self.database_engine), *args, **kwargs ) - except self.engine.module.DatabaseError as e: - if self.engine.is_deadlock(e): + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N) if i < N: i += 1 @@ -117,16 +153,50 @@ def chunks(n): @defer.inlineCallbacks def handle_table(table, sqlite_store, mysql_store): - N = 1000 + if table in APPEND_ONLY_TABLES: + # It's safe to just carry on inserting. + next_chunk = yield mysql_store._simple_select_one_onecol( + table="port_from_sqlite3", + keyvalues={"table_name": table}, + retcol="rowid", + allow_none=True, + ) + + if next_chunk is None: + yield mysql_store._simple_insert( + table="port_from_sqlite3", + values={"table_name": table, "rowid": 0} + ) + + next_chunk = 0 + else: + def delete_all(txn): + txn.execute( + "DELETE FROM port_from_sqlite3 WHERE table_name = %s", + (table,) + ) + txn.execute("DELETE FROM %s" % (table,)) + mysql_store._simple_insert_txn( + txn, + table="port_from_sqlite3", + values={"table_name": table, "rowid": 0} + ) + + yield mysql_store.runInteraction( + "delete_non_append_only", delete_all + ) + + next_chunk = 0 + + N = 5000 select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) uni_col_names = UNICODE_COLUMNS.get(table, []) def conv_uni(c): - return sqlite_store.engine.load_unicode(c) + return sqlite_store.database_engine.load_unicode(c) - next_chunk = 0 while True: def r(txn): txn.execute(select, (next_chunk, N,)) @@ -145,7 +215,7 @@ def handle_table(table, sqlite_store, mysql_store): for i, row in enumerate(rows): rows[i] = tuple( - mysql_store.engine.encode_parameter( + mysql_store.database_engine.encode_parameter( conv_uni(col) if j in uni_cols else col ) for j, col in enumerate(row) @@ -153,6 +223,12 @@ def handle_table(table, sqlite_store, mysql_store): ) yield mysql_store.insert_many(table, headers[1:], rows) + + yield mysql_store._simple_update_one( + table="port_from_sqlite3", + keyvalues={"table_name": table}, + updatevalues={"rowid": next_chunk}, + ) else: return @@ -208,6 +284,22 @@ def main(sqlite_config, mysql_config): logger.info("Found %d tables", len(tables)) + def create_port_table(txn): + try: + txn.execute( + "CREATE TABLE port_from_sqlite3 (" + " `table_name` varchar(100) NOT NULL UNIQUE," + " `rowid` bigint unsigned NOT NULL" + ")" + ) + except mysql_engine.module.DatabaseError as e: + if e.errno != mysql_engine.module.errorcode.ER_TABLE_EXISTS_ERROR: + raise + + yield mysql_store.runInteraction( + "create_port_table", create_port_table + ) + # Process tables. yield defer.gatherResults( [