152 lines
5.2 KiB
JavaScript
152 lines
5.2 KiB
JavaScript
import { randomUUID } from "node:crypto";
|
|
import { getPool } from "../db/pool";
|
|
function nowIso() {
|
|
return new Date().toISOString();
|
|
}
|
|
function cloneNotification(notification) {
|
|
return {
|
|
...notification,
|
|
payload_json: { ...notification.payload_json }
|
|
};
|
|
}
|
|
function mapNotification(row) {
|
|
return {
|
|
id: row.id,
|
|
transaction_id: row.transaction_id,
|
|
device_id: row.device_id || null,
|
|
delivery_channel: "mqtt",
|
|
payload_type: "payment_success",
|
|
delivery_status: row.delivery_status,
|
|
retry_count: row.retry_count,
|
|
ack_status: row.ack_status,
|
|
event_id: row.event_id,
|
|
reason: row.reason || undefined,
|
|
payload_json: row.payload_json || {},
|
|
created_at: row.created_at,
|
|
updated_at: row.updated_at,
|
|
sent_at: row.sent_at || undefined,
|
|
ack_at: row.ack_at || undefined,
|
|
next_retry_at: row.next_retry_at || undefined
|
|
};
|
|
}
|
|
export async function createNotification(payload) {
|
|
const now = nowIso();
|
|
const insert = await getPool().query(`INSERT INTO notifications (
|
|
id,
|
|
transaction_id,
|
|
device_id,
|
|
delivery_status,
|
|
retry_count,
|
|
ack_status,
|
|
event_id,
|
|
reason,
|
|
payload_json,
|
|
created_at,
|
|
updated_at
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
|
ON CONFLICT (transaction_id, event_id) DO UPDATE
|
|
SET updated_at = EXCLUDED.updated_at
|
|
RETURNING *`, [
|
|
randomUUID(),
|
|
payload.transaction_id,
|
|
payload.device_id,
|
|
payload.delivery_status,
|
|
0,
|
|
payload.ack_status || "not_needed",
|
|
payload.event_id,
|
|
payload.reason || null,
|
|
payload.payload_json || {},
|
|
now,
|
|
now
|
|
]);
|
|
if (insert.rowCount && insert.rowCount > 0) {
|
|
return mapNotification(insert.rows[0]);
|
|
}
|
|
const { rows } = await getPool().query("SELECT * FROM notifications WHERE transaction_id = $1 AND event_id = $2", [payload.transaction_id, payload.event_id]);
|
|
return mapNotification(rows[0]);
|
|
}
|
|
export async function getNotificationById(notificationId) {
|
|
const { rows } = await getPool().query("SELECT * FROM notifications WHERE id = $1", [notificationId]);
|
|
return rows[0] ? cloneNotification(mapNotification(rows[0])) : null;
|
|
}
|
|
export async function updateNotification(notificationId, patch) {
|
|
const existing = await getNotificationById(notificationId);
|
|
if (!existing) {
|
|
throw new Error("NOTIFICATION_NOT_FOUND");
|
|
}
|
|
const next = {
|
|
...existing,
|
|
...patch,
|
|
id: existing.id,
|
|
transaction_id: existing.transaction_id,
|
|
device_id: existing.device_id,
|
|
delivery_channel: existing.delivery_channel,
|
|
payload_type: existing.payload_type,
|
|
event_id: existing.event_id,
|
|
payload_json: existing.payload_json,
|
|
created_at: existing.created_at,
|
|
updated_at: nowIso()
|
|
};
|
|
const { rows } = await getPool().query(`UPDATE notifications
|
|
SET delivery_status = $2,
|
|
retry_count = $3,
|
|
ack_status = $4,
|
|
device_id = COALESCE($5, device_id),
|
|
reason = $6,
|
|
sent_at = $7,
|
|
ack_at = $8,
|
|
next_retry_at = $9,
|
|
updated_at = $10
|
|
WHERE id = $1
|
|
RETURNING *`, [
|
|
notificationId,
|
|
next.delivery_status,
|
|
next.retry_count,
|
|
next.ack_status,
|
|
next.device_id ?? null,
|
|
next.reason || null,
|
|
next.sent_at || null,
|
|
next.ack_at || null,
|
|
next.next_retry_at || null,
|
|
next.updated_at
|
|
]);
|
|
return cloneNotification(mapNotification(rows[0]));
|
|
}
|
|
export async function getNotificationByTransactionId(transactionId) {
|
|
const { rows } = await getPool().query(`SELECT * FROM notifications
|
|
WHERE transaction_id = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT 1`, [transactionId]);
|
|
return rows[0] ? mapNotification(rows[0]) : null;
|
|
}
|
|
export async function getNotificationByTransactionAndEvent(transactionId, eventId) {
|
|
const { rows } = await getPool().query("SELECT * FROM notifications WHERE transaction_id = $1 AND event_id = $2", [transactionId, eventId]);
|
|
return rows[0] ? mapNotification(rows[0]) : null;
|
|
}
|
|
export async function listNotificationsByDevice(deviceId) {
|
|
const { rows } = await getPool().query("SELECT * FROM notifications WHERE device_id = $1 ORDER BY created_at DESC", [deviceId]);
|
|
return rows.map(mapNotification);
|
|
}
|
|
export async function listNotifications(filter) {
|
|
const filters = [];
|
|
const params = [];
|
|
if (filter?.transaction_id) {
|
|
params.push(filter.transaction_id);
|
|
filters.push(`transaction_id = $${params.length}`);
|
|
}
|
|
if (filter?.device_id) {
|
|
params.push(filter.device_id);
|
|
filters.push(`device_id = $${params.length}`);
|
|
}
|
|
if (filter?.delivery_status) {
|
|
params.push(filter.delivery_status);
|
|
filters.push(`delivery_status = $${params.length}`);
|
|
}
|
|
const where = filters.length ? `WHERE ${filters.join(" AND ")}` : "";
|
|
const { rows } = await getPool().query(`SELECT * FROM notifications ${where} ORDER BY created_at DESC`, params);
|
|
return rows.map(mapNotification);
|
|
}
|
|
export function toNotificationPayload(notification) {
|
|
return cloneNotification(notification);
|
|
}
|