From 63677d1f4752fd928fb2b6f1e97c819896bc5323 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Apr 2015 10:23:24 +0100 Subject: [PATCH] Change port script to work with postgres --- scripts/port_to_maria.py | 114 +++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 47 deletions(-) diff --git a/scripts/port_to_maria.py b/scripts/port_to_maria.py index b14cca891..0d7ba9235 100644 --- a/scripts/port_to_maria.py +++ b/scripts/port_to_maria.py @@ -22,6 +22,7 @@ from synapse.storage.engines import create_engine import argparse import itertools import logging +import types import yaml @@ -51,6 +52,14 @@ UNICODE_COLUMNS = { } +BOOLEAN_COLUMNS = { + "events": ["processed", "outlier"], + "rooms": ["is_public"], + "event_edges": ["is_state"], + "presence_list": ["accepted"], +} + + APPEND_ONLY_TABLES = [ "event_content_hashes", "event_reference_hashes", @@ -126,24 +135,22 @@ class Store(object): return self.db_pool.runWithConnection(r) - def insert_many(self, table, headers, rows): + def insert_many_txn(self, txn, table, headers, rows): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in headers), ", ".join("%s" for _ in headers) ) - def t(txn): - try: - txn.executemany(sql, rows) - except: - logger.exception( - "Failed to insert: %s", - table, - ) - raise + try: + txn.executemany(sql, rows) + except: + logger.exception( + "Failed to insert: %s", + table, + ) + raise - return self.runInteraction("insert_many", t) def chunks(n): @@ -175,7 +182,7 @@ def handle_table(table, sqlite_store, mysql_store): "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,) ) - txn.execute("DELETE FROM %s" % (table,)) + txn.execute("TRUNCATE %s CASCADE" % (table,)) mysql_store._simple_insert_txn( txn, table="port_from_sqlite3", @@ -188,14 +195,15 @@ def handle_table(table, sqlite_store, mysql_store): next_chunk = 0 + logger.info("next_chunk for %s: %d", table, next_chunk) + N = 5000 select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) uni_col_names = UNICODE_COLUMNS.get(table, []) - - def conv_uni(c): - return sqlite_store.database_engine.load_unicode(c) + bool_col_names = BOOLEAN_COLUMNS.get(table, []) + bin_col_names = BINARY_COLUMNS.get(table, []) while True: def r(txn): @@ -211,24 +219,42 @@ def handle_table(table, sqlite_store, mysql_store): if rows: uni_cols = [i for i, h in enumerate(headers) if h in uni_col_names] + bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names] + bin_cols = [i for i, h in enumerate(headers) if h in bin_col_names] next_chunk = rows[-1][0] + 1 + def conv(j, col): + if j in uni_cols: + col = sqlite_store.database_engine.load_unicode(col) + if j in bool_cols: + return bool(col) + + if j in bin_cols: + if isinstance(col, types.UnicodeType): + col = buffer(col.encode("utf8")) + + return col + for i, row in enumerate(rows): rows[i] = tuple( mysql_store.database_engine.encode_parameter( - conv_uni(col) if j in uni_cols else col + conv(j, col) ) for j, col in enumerate(row) if j > 0 ) - yield mysql_store.insert_many(table, headers[1:], rows) + def ins(txn): + mysql_store.insert_many_txn(txn, table, headers[1:], rows) - yield mysql_store._simple_update_one( - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, - ) + mysql_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": table}, + updatevalues={"rowid": next_chunk}, + ) + + yield mysql_store.runInteraction("insert_many", ins) else: return @@ -260,7 +286,7 @@ def main(sqlite_config, mysql_config): ) sqlite_engine = create_engine("sqlite3") - mysql_engine = create_engine("mysql.connector") + mysql_engine = create_engine("psycopg2") sqlite_store = Store(sqlite_db_pool, sqlite_engine) mysql_store = Store(mysql_db_pool, mysql_engine) @@ -285,20 +311,19 @@ def main(sqlite_config, mysql_config): logger.info("Found %d tables", len(tables)) def create_port_table(txn): - try: - txn.execute( - "CREATE TABLE port_from_sqlite3 (" - " `table_name` varchar(100) NOT NULL UNIQUE," - " `rowid` bigint unsigned NOT NULL" - ")" - ) - except mysql_engine.module.DatabaseError as e: - if e.errno != mysql_engine.module.errorcode.ER_TABLE_EXISTS_ERROR: - raise + txn.execute( + "CREATE TABLE port_from_sqlite3 (" + " table_name varchar(100) NOT NULL UNIQUE," + " rowid bigint NOT NULL" + ")" + ) - yield mysql_store.runInteraction( - "create_port_table", create_port_table - ) + try: + yield mysql_store.runInteraction( + "create_port_table", create_port_table + ) + except Exception as e: + logger.info("Failed to create port table: %s", e) # Process tables. yield defer.gatherResults( @@ -342,17 +367,12 @@ if __name__ == "__main__": } mysql_config = yaml.safe_load(args.mysql_config) - mysql_config["args"].update({ - "sql_mode": "TRADITIONAL", - "charset": "utf8mb4", - "use_unicode": True, - "collation": "utf8mb4_bin", - }) - - import codecs - codecs.register( - lambda name: codecs.lookup('utf8') if name == "utf8mb4" else None - ) + # mysql_config["args"].update({ + # "sql_mode": "TRADITIONAL", + # "charset": "utf8mb4", + # "use_unicode": True, + # "collation": "utf8mb4_bin", + # }) reactor.callWhenRunning( main,