mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 12:16:09 -04:00
Support for database schema version ranges (#9933)
This is essentially an implementation of the proposal made at https://hackmd.io/@richvdh/BJYXQMQHO, though the details have ended up looking slightly different.
This commit is contained in:
parent
a14884fbb0
commit
c1b9922498
7 changed files with 207 additions and 78 deletions
|
@ -1,5 +1,4 @@
|
|||
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
# Copyright 2014 - 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -26,7 +25,7 @@ from synapse.config.homeserver import HomeServerConfig
|
|||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.engines.postgres import PostgresEngine
|
||||
from synapse.storage.schema import SCHEMA_VERSION
|
||||
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -59,6 +58,28 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
|
|||
)
|
||||
|
||||
|
||||
@attr.s
|
||||
class _SchemaState:
|
||||
current_version: int = attr.ib()
|
||||
"""The current schema version of the database"""
|
||||
|
||||
compat_version: Optional[int] = attr.ib()
|
||||
"""The SCHEMA_VERSION of the oldest version of Synapse for this database
|
||||
|
||||
If this is None, we have an old version of the database without the necessary
|
||||
table.
|
||||
"""
|
||||
|
||||
applied_deltas: Collection[str] = attr.ib(factory=tuple)
|
||||
"""Any delta files for `current_version` which have already been applied"""
|
||||
|
||||
upgraded: bool = attr.ib(default=False)
|
||||
"""Whether the current state was reached by applying deltas.
|
||||
|
||||
If False, we have run the full schema for `current_version`, and have applied no
|
||||
deltas since. If True, we have run some deltas since the original creation."""
|
||||
|
||||
|
||||
def prepare_database(
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
database_engine: BaseDatabaseEngine,
|
||||
|
@ -96,12 +117,11 @@ def prepare_database(
|
|||
version_info = _get_or_create_schema_state(cur, database_engine)
|
||||
|
||||
if version_info:
|
||||
user_version, delta_files, upgraded = version_info
|
||||
logger.info(
|
||||
"%r: Existing schema is %i (+%i deltas)",
|
||||
databases,
|
||||
user_version,
|
||||
len(delta_files),
|
||||
version_info.current_version,
|
||||
len(version_info.applied_deltas),
|
||||
)
|
||||
|
||||
# config should only be None when we are preparing an in-memory SQLite db,
|
||||
|
@ -113,16 +133,18 @@ def prepare_database(
|
|||
|
||||
# if it's a worker app, refuse to upgrade the database, to avoid multiple
|
||||
# workers doing it at once.
|
||||
if config.worker_app is not None and user_version != SCHEMA_VERSION:
|
||||
if (
|
||||
config.worker_app is not None
|
||||
and version_info.current_version != SCHEMA_VERSION
|
||||
):
|
||||
raise UpgradeDatabaseException(
|
||||
OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
|
||||
OUTDATED_SCHEMA_ON_WORKER_ERROR
|
||||
% (SCHEMA_VERSION, version_info.current_version)
|
||||
)
|
||||
|
||||
_upgrade_existing_database(
|
||||
cur,
|
||||
user_version,
|
||||
delta_files,
|
||||
upgraded,
|
||||
version_info,
|
||||
database_engine,
|
||||
config,
|
||||
databases=databases,
|
||||
|
@ -261,9 +283,7 @@ def _setup_new_database(
|
|||
|
||||
_upgrade_existing_database(
|
||||
cur,
|
||||
current_version=max_current_ver,
|
||||
applied_delta_files=[],
|
||||
upgraded=False,
|
||||
_SchemaState(current_version=max_current_ver, compat_version=None),
|
||||
database_engine=database_engine,
|
||||
config=None,
|
||||
databases=databases,
|
||||
|
@ -273,9 +293,7 @@ def _setup_new_database(
|
|||
|
||||
def _upgrade_existing_database(
|
||||
cur: Cursor,
|
||||
current_version: int,
|
||||
applied_delta_files: List[str],
|
||||
upgraded: bool,
|
||||
current_schema_state: _SchemaState,
|
||||
database_engine: BaseDatabaseEngine,
|
||||
config: Optional[HomeServerConfig],
|
||||
databases: Collection[str],
|
||||
|
@ -321,12 +339,8 @@ def _upgrade_existing_database(
|
|||
|
||||
Args:
|
||||
cur
|
||||
current_version: The current version of the schema.
|
||||
applied_delta_files: A list of deltas that have already been applied.
|
||||
upgraded: Whether the current version was generated by having
|
||||
applied deltas or from full schema file. If `True` the function
|
||||
will never apply delta files for the given `current_version`, since
|
||||
the current_version wasn't generated by applying those delta files.
|
||||
current_schema_state: The current version of the schema, as
|
||||
returned by _get_or_create_schema_state
|
||||
database_engine
|
||||
config:
|
||||
None if we are initialising a blank database, otherwise the application
|
||||
|
@ -337,13 +351,16 @@ def _upgrade_existing_database(
|
|||
upgrade portions of the delta scripts.
|
||||
"""
|
||||
if is_empty:
|
||||
assert not applied_delta_files
|
||||
assert not current_schema_state.applied_deltas
|
||||
else:
|
||||
assert config
|
||||
|
||||
is_worker = config and config.worker_app is not None
|
||||
|
||||
if current_version > SCHEMA_VERSION:
|
||||
if (
|
||||
current_schema_state.compat_version is not None
|
||||
and current_schema_state.compat_version > SCHEMA_VERSION
|
||||
):
|
||||
raise ValueError(
|
||||
"Cannot use this database as it is too "
|
||||
+ "new for the server to understand"
|
||||
|
@ -357,14 +374,26 @@ def _upgrade_existing_database(
|
|||
assert config is not None
|
||||
check_database_before_upgrade(cur, database_engine, config)
|
||||
|
||||
start_ver = current_version
|
||||
# update schema_compat_version before we run any upgrades, so that if synapse
|
||||
# gets downgraded again, it won't try to run against the upgraded database.
|
||||
if (
|
||||
current_schema_state.compat_version is None
|
||||
or current_schema_state.compat_version < SCHEMA_COMPAT_VERSION
|
||||
):
|
||||
cur.execute("DELETE FROM schema_compat_version")
|
||||
cur.execute(
|
||||
"INSERT INTO schema_compat_version(compat_version) VALUES (?)",
|
||||
(SCHEMA_COMPAT_VERSION,),
|
||||
)
|
||||
|
||||
start_ver = current_schema_state.current_version
|
||||
|
||||
# if we got to this schema version by running a full_schema rather than a series
|
||||
# of deltas, we should not run the deltas for this version.
|
||||
if not upgraded:
|
||||
if not current_schema_state.upgraded:
|
||||
start_ver += 1
|
||||
|
||||
logger.debug("applied_delta_files: %s", applied_delta_files)
|
||||
logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
specific_engine_extension = ".postgres"
|
||||
|
@ -440,7 +469,7 @@ def _upgrade_existing_database(
|
|||
absolute_path = entry.absolute_path
|
||||
|
||||
logger.debug("Found file: %s (%s)", relative_path, absolute_path)
|
||||
if relative_path in applied_delta_files:
|
||||
if relative_path in current_schema_state.applied_deltas:
|
||||
continue
|
||||
|
||||
root_name, ext = os.path.splitext(file_name)
|
||||
|
@ -621,7 +650,7 @@ def execute_statements_from_stream(cur: Cursor, f: TextIO) -> None:
|
|||
|
||||
def _get_or_create_schema_state(
|
||||
txn: Cursor, database_engine: BaseDatabaseEngine
|
||||
) -> Optional[Tuple[int, List[str], bool]]:
|
||||
) -> Optional[_SchemaState]:
|
||||
# Bluntly try creating the schema_version tables.
|
||||
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
|
||||
executescript(txn, sql_path)
|
||||
|
@ -629,17 +658,31 @@ def _get_or_create_schema_state(
|
|||
txn.execute("SELECT version, upgraded FROM schema_version")
|
||||
row = txn.fetchone()
|
||||
|
||||
if row is not None:
|
||||
current_version = int(row[0])
|
||||
txn.execute(
|
||||
"SELECT file FROM applied_schema_deltas WHERE version >= ?",
|
||||
(current_version,),
|
||||
)
|
||||
applied_deltas = [d for d, in txn]
|
||||
upgraded = bool(row[1])
|
||||
return current_version, applied_deltas, upgraded
|
||||
if row is None:
|
||||
# new database
|
||||
return None
|
||||
|
||||
return None
|
||||
current_version = int(row[0])
|
||||
upgraded = bool(row[1])
|
||||
|
||||
compat_version: Optional[int] = None
|
||||
txn.execute("SELECT compat_version FROM schema_compat_version")
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
compat_version = int(row[0])
|
||||
|
||||
txn.execute(
|
||||
"SELECT file FROM applied_schema_deltas WHERE version >= ?",
|
||||
(current_version,),
|
||||
)
|
||||
applied_deltas = tuple(d for d, in txn)
|
||||
|
||||
return _SchemaState(
|
||||
current_version=current_version,
|
||||
compat_version=compat_version,
|
||||
applied_deltas=applied_deltas,
|
||||
upgraded=upgraded,
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue