Refuse to upgrade database on worker processes (#8266)

This commit is contained in:
Richard van der Hoff 2020-09-07 13:04:10 +01:00 committed by GitHub
parent f25af1f9c7
commit 96312536f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 17 deletions

1
changelog.d/8266.misc Normal file
View File

@ -0,0 +1 @@
Do not attempt to upgrade upgrade database schema on worker processes.

View File

@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass pass
OUTDATED_SCHEMA_ON_WORKER_ERROR = (
"Expected database schema version %i but got %i: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)
EMPTY_DATABASE_ON_WORKER_ERROR = (
"Uninitialised database: run the main synapse process to prepare the database "
"schema before starting worker processes."
)
UNAPPLIED_DELTA_ON_WORKER_ERROR = (
"Database schema delta %s has not been applied: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)
def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
"""Prepares a physical database for usage. Will either create all necessary tables """Prepares a physical database for usage. Will either create all necessary tables
or upgrade from an older schema version. or upgrade from an older schema version.
@ -71,15 +87,20 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
if version_info: if version_info:
user_version, delta_files, upgraded = version_info user_version, delta_files, upgraded = version_info
# config should only be None when we are preparing an in-memory SQLite db,
# which should be empty.
if config is None: if config is None:
if user_version != SCHEMA_VERSION: raise ValueError(
# If we don't pass in a config file then we are expecting to "config==None in prepare_database, but databse is not empty"
# have already upgraded the DB.
raise UpgradeDatabaseException(
"Expected database schema version %i but got %i"
% (SCHEMA_VERSION, user_version)
) )
else:
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker_app is not None and user_version != SCHEMA_VERSION:
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
)
_upgrade_existing_database( _upgrade_existing_database(
cur, cur,
user_version, user_version,
@ -90,6 +111,11 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
databases=databases, databases=databases,
) )
else: else:
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config and config.worker_app is not None:
raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
_setup_new_database(cur, database_engine, databases=databases) _setup_new_database(cur, database_engine, databases=databases)
# check if any of our configured dynamic modules want a database # check if any of our configured dynamic modules want a database
@ -295,6 +321,8 @@ def _upgrade_existing_database(
else: else:
assert config assert config
is_worker = config and config.worker_app is not None
if current_version > SCHEMA_VERSION: if current_version > SCHEMA_VERSION:
raise ValueError( raise ValueError(
"Cannot use this database as it is too " "Cannot use this database as it is too "
@ -322,7 +350,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres") specific_engine_extensions = (".sqlite", ".postgres")
for v in range(start_ver, SCHEMA_VERSION + 1): for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v) logger.info("Applying schema deltas for v%d", v)
# We need to search both the global and per data store schema # We need to search both the global and per data store schema
# directories for schema updates. # directories for schema updates.
@ -382,9 +410,15 @@ def _upgrade_existing_database(
continue continue
root_name, ext = os.path.splitext(file_name) root_name, ext = os.path.splitext(file_name)
if ext == ".py": if ext == ".py":
# This is a python upgrade module. We need to import into some # This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function. # package and then execute its `run_upgrade` function.
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
module_name = "synapse.storage.v%d_%s" % (v, root_name) module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file: with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file) module = imp.load_source(module_name, absolute_path, python_file)
@ -399,10 +433,18 @@ def _upgrade_existing_database(
continue continue
elif ext == ".sql": elif ext == ".sql":
# A plain old .sql file, just read and execute it # A plain old .sql file, just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying schema %s", relative_path) logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path) executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"): elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it # A .sql file specific to our engine; just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying engine-specific schema %s", relative_path) logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path) executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"): elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@ -432,6 +474,8 @@ def _upgrade_existing_database(
(v, True), (v, True),
) )
logger.info("Schema now up to date")
def _apply_module_schemas(txn, database_engine, config): def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any """Apply the module schemas for the dynamic modules, if any