This commit is contained in:
Louis Lam 2024-10-09 07:19:05 +08:00
parent 344fd52501
commit d7c3c40d74
2 changed files with 75 additions and 47 deletions

View File

@ -741,6 +741,14 @@ class Database {
await Settings.set("migratedAggregateTable", true); await Settings.set("migratedAggregateTable", true);
} }
// Empty the aggregate table if FORCE_MIGRATE_AGGREGATE_TABLE is set to 1
if (process.env.FORCE_MIGRATE_AGGREGATE_TABLE === "1") {
log.warn("db", "FORCE_MIGRATE_AGGREGATE_TABLE is set to 1, forcing aggregate table migration");
await R.exec("DELETE FROM stat_minutely");
await R.exec("DELETE FROM stat_hourly");
await R.exec("DELETE FROM stat_daily");
}
// //
let migrated = await Settings.get("migratedAggregateTable"); let migrated = await Settings.get("migratedAggregateTable");
@ -751,60 +759,61 @@ class Database {
log.info("db", "Migrating Aggregate Table"); log.info("db", "Migrating Aggregate Table");
// Migrate heartbeat to stat_minutely, using knex transaction log.info("db", "Getting list of unique dates and monitors");
const trx = await R.knex.transaction();
// Get a list of unique dates from the heartbeat table, using raw sql // Get a list of unique dates from the heartbeat table, using raw sql
let dates = await trx.raw(` let dates = await R.getAll(`
SELECT DISTINCT DATE(time) AS date SELECT DISTINCT DATE(time) AS date
FROM heartbeat FROM heartbeat
ORDER BY date ASC ORDER BY date ASC
`); `);
// Get a list of unique monitors from the heartbeat table, using raw sql // Get a list of unique monitors from the heartbeat table, using raw sql
let monitors = await trx.raw(` let monitors = await R.getAll(`
SELECT DISTINCT monitor_id SELECT DISTINCT monitor_id
FROM heartbeat FROM heartbeat
ORDER BY monitor_id ASC
`); `);
// Stop if stat_* tables are not empty
// SQL to empty these tables: DELETE FROM stat_minutely; DELETE FROM stat_hourly; DELETE FROM stat_daily;
for (let table of [ "stat_minutely", "stat_hourly", "stat_daily" ]) {
let countResult = await trx.raw(`SELECT COUNT(*) AS count FROM ${table}`);
let count = countResult[0].count;
if (count > 0) {
log.warn("db", `Aggregate table ${table} is not empty, migration will not be started (Maybe you were using 2.0.0-dev?)`);
trx.rollback();
return;
}
}
console.log("Dates", dates); console.log("Dates", dates);
console.log("Monitors", monitors); console.log("Monitors", monitors);
for (let monitor of monitors) { // Stop if stat_* tables are not empty
for (let date of dates) { for (let table of [ "stat_minutely", "stat_hourly", "stat_daily" ]) {
log.info("db", `Migrating monitor ${monitor.monitor_id} on date ${date.date}`); let countResult = await R.getRow(`SELECT COUNT(*) AS count FROM ${table}`);
let count = countResult.count;
// New Uptime Calculator if (count > 0) {
let calculator = new UptimeCalculator(); log.warn("db", `Aggregate table ${table} is not empty, migration will not be started (Maybe you were using 2.0.0-dev?)`);
return;
// TODO: Pass transaction to the calculator
// calculator.setTransaction(trx);
// Get all the heartbeats for this monitor and date
let heartbeats = await trx("heartbeat")
.where("monitor_id", monitor.monitor_id)
.whereRaw("DATE(time) = ?", [ date.date ])
.orderBy("time", "asc");
for (let heartbeat of heartbeats) {
calculator.update(heartbeat.status, heartbeat.ping, dayjs(heartbeat.time));
}
} }
} }
trx.commit(); for (let monitor of monitors) {
for (let date of dates) {
// New Uptime Calculator
let calculator = new UptimeCalculator();
calculator.monitorID = monitor.monitor_id;
calculator.setMigrationMode(true);
// Get all the heartbeats for this monitor and date
let heartbeats = await R.getAll(`
SELECT status, ping, time
FROM heartbeat
WHERE monitor_id = ?
AND DATE(time) = ?
ORDER BY time ASC
`, [ monitor.monitor_id, date.date ]);
if (heartbeats.length > 0) {
log.info("db", `Migrating monitor ${monitor.monitor_id} on date ${date.date}`);
}
for (let heartbeat of heartbeats) {
await calculator.update(heartbeat.status, parseFloat(heartbeat.ping), dayjs(heartbeat.time));
}
}
}
//await Settings.set("migratedAggregateTable", true); //await Settings.set("migratedAggregateTable", true);
} }

View File

@ -12,7 +12,6 @@ class UptimeCalculator {
* @private * @private
* @type {{string:UptimeCalculator}} * @type {{string:UptimeCalculator}}
*/ */
static list = {}; static list = {};
/** /**
@ -55,6 +54,12 @@ class UptimeCalculator {
lastHourlyStatBean = null; lastHourlyStatBean = null;
lastMinutelyStatBean = null; lastMinutelyStatBean = null;
/**
* For migration purposes.
* @type {boolean}
*/
migrationMode = false;
/** /**
* Get the uptime calculator for a monitor * Get the uptime calculator for a monitor
* Initializes and returns the monitor if it does not exist * Initializes and returns the monitor if it does not exist
@ -194,6 +199,10 @@ class UptimeCalculator {
* @throws {Error} Invalid status * @throws {Error} Invalid status
*/ */
async update(status, ping = 0, date) { async update(status, ping = 0, date) {
if (!this.monitorID) {
throw new Error("Monitor ID is required");
}
if (!date) { if (!date) {
date = this.getCurrentDate(); date = this.getCurrentDate();
} }
@ -330,17 +339,19 @@ class UptimeCalculator {
} }
await R.store(minutelyStatBean); await R.store(minutelyStatBean);
// Remove the old data if (!this.migrationMode) {
log.debug("uptime-calc", "Remove old data"); // Remove the old data
await R.exec("DELETE FROM stat_minutely WHERE monitor_id = ? AND timestamp < ?", [ log.debug("uptime-calc", "Remove old data");
this.monitorID, await R.exec("DELETE FROM stat_minutely WHERE monitor_id = ? AND timestamp < ?", [
this.getMinutelyKey(date.subtract(24, "hour")), this.monitorID,
]); this.getMinutelyKey(date.subtract(24, "hour")),
]);
await R.exec("DELETE FROM stat_hourly WHERE monitor_id = ? AND timestamp < ?", [ await R.exec("DELETE FROM stat_hourly WHERE monitor_id = ? AND timestamp < ?", [
this.monitorID, this.monitorID,
this.getHourlyKey(date.subtract(30, "day")), this.getHourlyKey(date.subtract(30, "day")),
]); ]);
}
return date; return date;
} }
@ -815,6 +826,14 @@ class UptimeCalculator {
return dayjs.utc(); return dayjs.utc();
} }
/**
* For migration purposes.
* @param {boolean} value Migration mode on/off
* @returns {void}
*/
setMigrationMode(value) {
this.migrationMode = value;
}
} }
class UptimeDataResult { class UptimeDataResult {