Fix schema management to work with multiple data stores.

This commit is contained in:
Erik Johnston 2019-10-21 16:08:40 +01:00
parent c66a06ac6b
commit ffd24545bb

View File

@ -14,12 +14,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import fnmatch
import imp import imp
import logging import logging
import os import os
import re import re
import attr
from synapse.storage.engines.postgres import PostgresEngine from synapse.storage.engines.postgres import PostgresEngine
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,6 +55,10 @@ def prepare_database(db_conn, database_engine, config):
application config, or None if we are connecting to an existing application config, or None if we are connecting to an existing
database which we expect to be configured already database which we expect to be configured already
""" """
# For now we only have the one datastore.
data_stores = ["main"]
try: try:
cur = db_conn.cursor() cur = db_conn.cursor()
version_info = _get_or_create_schema_state(cur, database_engine) version_info = _get_or_create_schema_state(cur, database_engine)
@ -68,10 +73,16 @@ def prepare_database(db_conn, database_engine, config):
raise UpgradeDatabaseException("Database needs to be upgraded") raise UpgradeDatabaseException("Database needs to be upgraded")
else: else:
_upgrade_existing_database( _upgrade_existing_database(
cur, user_version, delta_files, upgraded, database_engine, config cur,
user_version,
delta_files,
upgraded,
database_engine,
config,
data_stores=data_stores,
) )
else: else:
_setup_new_database(cur, database_engine) _setup_new_database(cur, database_engine, data_stores=data_stores)
# check if any of our configured dynamic modules want a database # check if any of our configured dynamic modules want a database
if config is not None: if config is not None:
@ -84,7 +95,7 @@ def prepare_database(db_conn, database_engine, config):
raise raise
def _setup_new_database(cur, database_engine): def _setup_new_database(cur, database_engine, data_stores):
"""Sets up the database by finding a base set of "full schemas" and then """Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas. applying any necessary deltas.
@ -115,48 +126,65 @@ def _setup_new_database(cur, database_engine):
current_dir = os.path.join(dir_path, "schema", "full_schemas") current_dir = os.path.join(dir_path, "schema", "full_schemas")
directory_entries = os.listdir(current_dir) directory_entries = os.listdir(current_dir)
valid_dirs = [] # First we find the highest full schema version we have
pattern = re.compile(r"^\d+(\.sql)?$") valid_versions = []
for filename in directory_entries:
try:
ver = int(filename)
except ValueError:
continue
if ver <= SCHEMA_VERSION:
valid_versions.append(ver)
if not valid_versions:
raise PrepareDatabaseException(
"Could not find a suitable base set of full schemas"
)
max_current_ver = max(valid_versions)
logger.debug("Initialising schema v%d", max_current_ver)
# Now lets find all the full schema files, both in the global schema and
# in data store schemas.
directories = [os.path.join(current_dir, str(max_current_ver))]
directories.extend(
os.path.join(
dir_path,
"data_stores",
data_store,
"schema",
"full_schemas",
str(max_current_ver),
)
for data_store in data_stores
)
directory_entries = []
for directory in directories:
directory_entries.extend(
_DirectoryListing(file_name, os.path.join(directory, file_name))
for file_name in os.listdir(directory)
)
if isinstance(database_engine, PostgresEngine): if isinstance(database_engine, PostgresEngine):
specific = "postgres" specific = "postgres"
else: else:
specific = "sqlite" specific = "sqlite"
specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$") directory_entries.sort()
for entry in directory_entries:
for filename in directory_entries: if entry.file_name.endswith(".sql") or entry.file_name.endswith(
match = pattern.match(filename) or specific_pattern.match(filename) ".sql." + specific
abs_path = os.path.join(current_dir, filename) ):
if match and os.path.isdir(abs_path): logger.debug("Applying schema %s", entry.absolute_path)
ver = int(match.group(0)) executescript(cur, entry.absolute_path)
if ver <= SCHEMA_VERSION:
valid_dirs.append((ver, abs_path))
else:
logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
if not valid_dirs:
raise PrepareDatabaseException(
"Could not find a suitable base set of full schemas"
)
max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
logger.debug("Initialising schema v%d", max_current_ver)
directory_entries = os.listdir(sql_dir)
for filename in sorted(
fnmatch.filter(directory_entries, "*.sql")
+ fnmatch.filter(directory_entries, "*.sql." + specific)
):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
executescript(cur, sql_loc)
cur.execute( cur.execute(
database_engine.convert_param_style( database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" "INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
), ),
(max_current_ver, False), (max_current_ver, False),
) )
@ -168,6 +196,7 @@ def _setup_new_database(cur, database_engine):
upgraded=False, upgraded=False,
database_engine=database_engine, database_engine=database_engine,
config=None, config=None,
data_stores=data_stores,
is_empty=True, is_empty=True,
) )
@ -179,6 +208,7 @@ def _upgrade_existing_database(
upgraded, upgraded,
database_engine, database_engine,
config, config,
data_stores,
is_empty=False, is_empty=False,
): ):
"""Upgrades an existing database. """Upgrades an existing database.
@ -248,24 +278,51 @@ def _upgrade_existing_database(
for v in range(start_ver, SCHEMA_VERSION + 1): for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v) logger.info("Upgrading schema to v%d", v)
delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) # We need to search both the global and per data store schema
# directories for schema updates.
try: # First we find the directories to search in
directory_entries = os.listdir(delta_dir) delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
except OSError: directories = [delta_dir]
logger.exception("Could not open delta dir for version %d", v) for data_store in data_stores:
raise UpgradeDatabaseException( directories.append(
"Could not open delta dir for version %d" % (v,) os.path.join(
dir_path, "data_stores", data_store, "schema", "delta", str(v)
)
) )
# Now find which directories have anything of interest.
directory_entries = []
for directory in directories:
logger.debug("Looking for schema deltas in %s", directory)
try:
file_names = os.listdir(directory)
directory_entries.extend(
_DirectoryListing(file_name, os.path.join(directory, file_name))
for file_name in file_names
)
except FileNotFoundError:
# Data stores can have empty entries for a given version delta.
pass
except OSError:
logger.exception("Could not open delta dir for version %d", v)
raise UpgradeDatabaseException(
"Could not open delta dir for version %d" % (v,)
)
if not directory_entries:
continue
directory_entries.sort() directory_entries.sort()
for file_name in directory_entries: for entry in directory_entries:
file_name = entry.file_name
relative_path = os.path.join(str(v), file_name) relative_path = os.path.join(str(v), file_name)
absolute_path = entry.absolute_path
logger.debug("Found file: %s", relative_path) logger.debug("Found file: %s", relative_path)
if relative_path in applied_delta_files: if relative_path in applied_delta_files:
continue continue
absolute_path = os.path.join(dir_path, "schema", "delta", relative_path)
root_name, ext = os.path.splitext(file_name) root_name, ext = os.path.splitext(file_name)
if ext == ".py": if ext == ".py":
# This is a python upgrade module. We need to import into some # This is a python upgrade module. We need to import into some
@ -448,3 +505,13 @@ def _get_or_create_schema_state(txn, database_engine):
return current_version, applied_deltas, upgraded return current_version, applied_deltas, upgraded
return None return None
@attr.s()
class _DirectoryListing(object):
"""Helper class to store schema file name and the
absolute path to it.
"""
file_name = attr.ib()
absolute_path = attr.ib()