This commit is contained in:
Dull Bananas 2024-05-11 17:55:36 +00:00
parent 402ab1414f
commit 08ae13b877
2 changed files with 53 additions and 56 deletions

View File

@ -1,7 +1,10 @@
use crate::schema::previously_run_sql;
use anyhow::Context;
use diesel::{
backend::Backend,
connection::SimpleConnection,
migration::{Migration, MigrationSource},
pg::Pg,
select,
update,
Connection,
@ -12,11 +15,11 @@ use diesel::{
RunQueryDsl,
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use lemmy_utils::error::LemmyError;
use lemmy_utils::error::{LemmyError, LemmyResult};
use std::time::Instant;
use tracing::info;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
const EMBEDDED_MIGRATIONS: EmbeddedMigrations = embed_migrations!();
/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema
@ -29,12 +32,37 @@ const REPLACEABLE_SCHEMA: &[&str] = &[
const REVERT_REPLACEABLE_SCHEMA: &str = "DROP SCHEMA IF EXISTS r CASCADE;";
// TODO use full names
const FORBID_DIESEL_CLI_MIGRATION_VERSION: &str = "0000000000000";
const LOCK_STATEMENT: &str = "LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;";
const CUSTOM_MIGRATION_RUNNER_MIGRATION_VERSION: &str = "2024-04-29-012113";
struct Migrations;
pub fn run(db_url: &str) -> Result<(), LemmyError> {
impl<DB: Backend> MigrationSource<DB> for Migrations {
fn migrations(&self) -> diesel::migration::Result<Vec<Box<dyn Migration<DB>>>> {
let mut migrations = EMBEDDED_MIGRATIONS.migrations()?;
let skipped_migration = if migrations.is_empty() {
None
} else {
Some(migrations.remove(0))
};
debug_assert_eq!(
skipped_migration.map(|m| m.name().to_string()),
Some("000000000000000_forbid_diesel_cli".to_string())
);
Ok(migrations)
}
}
fn get_pending_migrations(conn: &mut PgConnection) -> LemmyResult<Vec<Box<dyn Migration<Pg>>>> {
Ok(
conn
.pending_migrations(Migrations)
.map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?,
)
}
pub fn run(db_url: &str) -> LemmyResult<()> {
// Migrations don't support async connection
let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?;
@ -44,15 +72,12 @@ pub fn run(db_url: &str) -> Result<(), LemmyError> {
let new_sql = REPLACEABLE_SCHEMA.join("\n");
// Early return should be as fast as possible and not do any locks in the database, because this case
// is reached whenever a lemmy_server process is started, which can happen frequently on a production server
// with a horizontally scaled setup.
let unfiltered_pending_migrations = conn
.pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?;
let pending_migrations = get_pending_migrations(&mut conn)?;
// Check len first so this doesn't run without the previously_run_sql table existing
if unfiltered_pending_migrations.len() == 1 {
// If possible, skip locking the migrations table and recreating the "r" schema, so
// lemmy_server processes in a horizontally scaled setup can start without causing locks
if pending_migrations.is_empty() {
// The condition above implies that the migration that creates the previously_run_sql table was already run
let sql_unchanged: bool = select(
previously_run_sql::table
.select(previously_run_sql::content)
@ -63,12 +88,6 @@ pub fn run(db_url: &str) -> Result<(), LemmyError> {
.get_result(&mut conn)?;
if sql_unchanged {
debug_assert_eq!(
unfiltered_pending_migrations
.get(0)
.map(|m| m.name().version()),
Some(FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
);
return Ok(());
}
}
@ -78,52 +97,29 @@ pub fn run(db_url: &str) -> Result<(), LemmyError> {
// lemmy_server processes from running this transaction concurrently. This lock does not block
// `MigrationHarness::pending_migrations` (`SELECT`) or `MigrationHarness::run_migration` (`INSERT`).
info!("Waiting for lock...");
conn.batch_execute("LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;")?;
conn.batch_execute(LOCK_STATEMENT)?;
info!("Running Database migrations (This may take a long time)...");
// Check pending migrations again after locking
let unfiltered_pending_migrations = conn.pending_migrations(MIGRATIONS).map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?;
// Does not include the "forbid_diesel_cli" migration
let pending_migrations = unfiltered_pending_migrations.get(1..).expect(
"original pending migrations length should be at least 1 because of the forbid_diesel_cli migration",
);
// Check migration version constants in debug mode
debug_assert_eq!(
unfiltered_pending_migrations
.get(0)
.map(|m| m.name().version()),
Some(FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
);
debug_assert_eq!(
pending_migrations
.iter()
.filter(|m| m.name().version() == FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
.count(),
0
);
/*TODO maybe do this for all migrations not just pending
debug_assert_eq!(
pending_migrations
.iter()
.filter(|m| m.name().version() == CUSTOM_MIGRATION_RUNNER_MIGRATION_VERSION.into())
.count(),
1
);*/
let pending_migrations = get_pending_migrations(conn)?;
// Run migrations, without stuff from replaceable_schema
conn.batch_execute(REVERT_REPLACEABLE_SCHEMA).context("Couldn't drop schema `r`")?;
for migration in pending_migrations {
conn.batch_execute(REVERT_REPLACEABLE_SCHEMA)?;
for migration in &pending_migrations {
let name = migration.name();
// TODO measure time on database
let start_time = Instant::now();
conn.run_migration(migration).map_err(|e| anyhow::anyhow!("Couldn't run migration {name}: {e}"))?;
conn
.run_migration(migration)
.map_err(|e| anyhow::anyhow!("Couldn't run migration {name}: {e}"))?;
let duration = start_time.elapsed().as_millis();
info!("{duration}ms {name}");
info!("{duration}ms run {name}");
}
// Run replaceable_schema
conn.batch_execute(&new_sql).context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
conn
.batch_execute(&new_sql)
.context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
let num_rows_updated = update(previously_run_sql::table)
.set(previously_run_sql::content.eq(new_sql))
@ -133,6 +129,7 @@ pub fn run(db_url: &str) -> Result<(), LemmyError> {
Ok(())
})?;
info!("Database migrations complete.");
Ok(())