import fs from "node:fs/promises"; import path from "node:path"; import { pathToFileURL } from "node:url"; import { closePool, ensureSchema, getPool } from "../src/shared/db/pool"; type MigrationContext = { ensureSchema: typeof ensureSchema; }; type JsMigrationModule = { up?: (context: MigrationContext) => Promise; }; const migrationsDir = path.resolve(process.cwd(), "migrations"); async function ensureMigrationTable() { await getPool().query(` CREATE TABLE IF NOT EXISTS schema_migrations ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, checksum TEXT, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) `); } async function listMigrationFiles() { const entries = await fs.readdir(migrationsDir, { withFileTypes: true }); return entries .filter((entry) => entry.isFile()) .map((entry) => entry.name) .filter((name) => /^\d+_.+\.(sql|mjs|mts)$/.test(name)) .sort((a, b) => a.localeCompare(b)); } async function getAppliedMigrationIds() { const { rows } = await getPool().query("SELECT id FROM schema_migrations ORDER BY id ASC"); return new Set(rows.map((row) => String(row.id))); } function migrationId(filename: string) { return filename.split("_")[0]; } async function runSqlMigration(filePath: string) { const sql = await fs.readFile(filePath, "utf8"); if (!sql.trim()) { return; } await getPool().query(sql); } async function runJsMigration(filePath: string) { const moduleUrl = pathToFileURL(filePath).href; const migration = (await import(moduleUrl)) as JsMigrationModule; if (typeof migration.up !== "function") { throw new Error(`${path.basename(filePath)} must export async function up(context)`); } await migration.up({ ensureSchema }); } async function recordMigration(id: string, filename: string) { await getPool().query( `INSERT INTO schema_migrations (id, filename, applied_at) VALUES ($1,$2,NOW()) ON CONFLICT (id) DO NOTHING`, [id, filename] ); } async function main() { await ensureMigrationTable(); const lock = await getPool().query("SELECT pg_try_advisory_lock($1) AS locked", [7642001]); if (!lock.rows[0]?.locked) { throw new Error("another migration process is already running"); } try { const files = await listMigrationFiles(); const applied = await getAppliedMigrationIds(); let appliedCount = 0; for (const filename of files) { const id = migrationId(filename); if (applied.has(id)) { console.log(`skip ${filename}`); continue; } const filePath = path.join(migrationsDir, filename); console.log(`apply ${filename}`); if (filename.endsWith(".sql")) { await runSqlMigration(filePath); } else { await runJsMigration(filePath); } await recordMigration(id, filename); appliedCount += 1; } console.log(`migration complete applied=${appliedCount}`); } finally { await getPool().query("SELECT pg_advisory_unlock($1)", [7642001]); await closePool(); } } main().catch(async (error) => { console.error(error instanceof Error ? error.message : error); await closePool(); process.exit(1); });