113 lines
3.1 KiB
TypeScript
113 lines
3.1 KiB
TypeScript
import fs from "node:fs/promises";
|
|
import path from "node:path";
|
|
import { pathToFileURL } from "node:url";
|
|
import { closePool, ensureSchema, getPool } from "../src/shared/db/pool";
|
|
|
|
type MigrationContext = {
|
|
ensureSchema: typeof ensureSchema;
|
|
};
|
|
|
|
type JsMigrationModule = {
|
|
up?: (context: MigrationContext) => Promise<void>;
|
|
};
|
|
|
|
const migrationsDir = path.resolve(process.cwd(), "migrations");
|
|
|
|
async function ensureMigrationTable() {
|
|
await getPool().query(`
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
checksum TEXT,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
`);
|
|
}
|
|
|
|
async function listMigrationFiles() {
|
|
const entries = await fs.readdir(migrationsDir, { withFileTypes: true });
|
|
return entries
|
|
.filter((entry) => entry.isFile())
|
|
.map((entry) => entry.name)
|
|
.filter((name) => /^\d+_.+\.(sql|mjs|mts)$/.test(name))
|
|
.sort((a, b) => a.localeCompare(b));
|
|
}
|
|
|
|
async function getAppliedMigrationIds() {
|
|
const { rows } = await getPool().query("SELECT id FROM schema_migrations ORDER BY id ASC");
|
|
return new Set(rows.map((row) => String(row.id)));
|
|
}
|
|
|
|
function migrationId(filename: string) {
|
|
return filename.split("_")[0];
|
|
}
|
|
|
|
async function runSqlMigration(filePath: string) {
|
|
const sql = await fs.readFile(filePath, "utf8");
|
|
if (!sql.trim()) {
|
|
return;
|
|
}
|
|
await getPool().query(sql);
|
|
}
|
|
|
|
async function runJsMigration(filePath: string) {
|
|
const moduleUrl = pathToFileURL(filePath).href;
|
|
const migration = (await import(moduleUrl)) as JsMigrationModule;
|
|
if (typeof migration.up !== "function") {
|
|
throw new Error(`${path.basename(filePath)} must export async function up(context)`);
|
|
}
|
|
await migration.up({ ensureSchema });
|
|
}
|
|
|
|
async function recordMigration(id: string, filename: string) {
|
|
await getPool().query(
|
|
`INSERT INTO schema_migrations (id, filename, applied_at)
|
|
VALUES ($1,$2,NOW())
|
|
ON CONFLICT (id) DO NOTHING`,
|
|
[id, filename]
|
|
);
|
|
}
|
|
|
|
async function main() {
|
|
await ensureMigrationTable();
|
|
const lock = await getPool().query("SELECT pg_try_advisory_lock($1) AS locked", [7642001]);
|
|
if (!lock.rows[0]?.locked) {
|
|
throw new Error("another migration process is already running");
|
|
}
|
|
|
|
try {
|
|
const files = await listMigrationFiles();
|
|
const applied = await getAppliedMigrationIds();
|
|
let appliedCount = 0;
|
|
|
|
for (const filename of files) {
|
|
const id = migrationId(filename);
|
|
if (applied.has(id)) {
|
|
console.log(`skip ${filename}`);
|
|
continue;
|
|
}
|
|
|
|
const filePath = path.join(migrationsDir, filename);
|
|
console.log(`apply ${filename}`);
|
|
if (filename.endsWith(".sql")) {
|
|
await runSqlMigration(filePath);
|
|
} else {
|
|
await runJsMigration(filePath);
|
|
}
|
|
await recordMigration(id, filename);
|
|
appliedCount += 1;
|
|
}
|
|
|
|
console.log(`migration complete applied=${appliedCount}`);
|
|
} finally {
|
|
await getPool().query("SELECT pg_advisory_unlock($1)", [7642001]);
|
|
await closePool();
|
|
}
|
|
}
|
|
|
|
main().catch(async (error) => {
|
|
console.error(error instanceof Error ? error.message : error);
|
|
await closePool();
|
|
process.exit(1);
|
|
});
|