import { getActiveBindingByDevice, getActiveBindingByTerminal } from "../store/bindingStore"; import { createNotification, getNotificationByTransactionAndEvent, getNotificationByTransactionId, listNotifications, toNotificationPayload, updateNotification } from "../store/notificationStore"; import { getMerchantById } from "../store/merchantStore"; import { getTransactionById, listTransactions, toTransactionPayload } from "../store/transactionStore"; import { buildPaymentSuccessPayload, publishPaymentSuccess } from "../services/mqttPublisher"; import { subscribeTransactionPaid } from "../events/transactionEvents"; import { env } from "../../config/env"; const RETRY_INTERVAL_MS = [ env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000, (env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000) * 2, (env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000) * 4 ]; const RETRY_POLL_INTERVAL_MS = 5000; const MAX_RETRY = 3; async function resolveNotificationDevice(payload) { if (payload.device_id) { const binding = await getActiveBindingByDevice(payload.device_id); if (binding && binding.terminal_id === payload.terminal_id) { return { deviceId: payload.device_id, source: "tx_device" }; } } const terminalBinding = await getActiveBindingByTerminal(payload.terminal_id); if (!terminalBinding) { return null; } return { deviceId: terminalBinding.device_id, source: "binding_device" }; } function buildNotificationPayload(txPayload, deviceId) { return { message_type: "payment_success", transaction_id: txPayload.transaction_id, merchant_id: txPayload.merchant_id, terminal_id: txPayload.terminal_id, amount: txPayload.amount, currency: txPayload.currency, paid_at: txPayload.paid_at, partner_reference: txPayload.partner_reference, target_device_id: deviceId }; } function resolveDeliveryStatus(bindingResolved) { return bindingResolved ? "queued" : "failed"; } function resolveFailureReason(bindingResolved) { if (!bindingResolved) { return "NOTIFICATION_NO_ACTIVE_BINDING"; } return undefined; } function makeNextRetryDate(retryCount) { const intervalMs = RETRY_INTERVAL_MS[Math.min(retryCount, RETRY_INTERVAL_MS.length - 1)] || 60000; return new Date(Date.now() + intervalMs).toISOString(); } async function getNotificationMerchantName(merchantId) { const merchant = await getMerchantById(merchantId); return merchant?.brand_name || merchant?.legal_name || merchantId; } function mapMqttFailureState(retryCount, reason) { const nextRetryCount = retryCount + 1; if (nextRetryCount >= MAX_RETRY) { return { status: "failed", retry_count: nextRetryCount, reason: reason || "NOTIFICATION_RETRY_EXHAUSTED" }; } return { status: "retrying", retry_count: nextRetryCount, next_retry_at: makeNextRetryDate(retryCount), reason }; } async function markNotificationSent(notification, publishResult) { await updateNotification(notification.id, { delivery_status: "sent", retry_count: notification.retry_count, ack_status: "not_supported", sent_at: publishResult.publishedAt }); } async function markNotificationFailed(notification, publishResult) { const next = mapMqttFailureState(notification.retry_count, publishResult.reason); await updateNotification(notification.id, { delivery_status: next.status, retry_count: next.retry_count, reason: next.reason, ack_status: "not_supported", next_retry_at: next.next_retry_at }); } async function markNoDeviceFailure(notification) { await updateNotification(notification.id, { delivery_status: "failed", retry_count: 0, reason: "NOTIFICATION_NO_ACTIVE_BINDING", ack_status: "not_needed" }); } async function resolveNotificationFromTransaction(notification) { const tx = await getTransactionById(notification.transaction_id); if (!tx) { return null; } return { transaction_id: tx.id, merchant_id: tx.merchant_id, outlet_id: tx.outlet_id, terminal_id: tx.terminal_id, device_id: tx.device_id, amount: tx.amount, currency: tx.currency, paid_at: tx.paid_at, partner_reference: tx.partner_reference }; } async function getMqttPayloadFromNotification(notification, paidAt) { return buildPaymentSuccessPayload({ transaction_id: String(notification.payload_json.transaction_id || ""), merchant_id: String(notification.payload_json.merchant_id || ""), merchant_name: await getNotificationMerchantName(String(notification.payload_json.merchant_id || "")), device_id: String(notification.device_id || ""), amount: Number(notification.payload_json.amount || 0), currency: String(notification.payload_json.currency || "IDR"), paid_at: paidAt, partner_reference: String(notification.payload_json.partner_reference || ""), event_id: notification.event_id }); } async function publishNotificationNow(notification, eventPayload) { if (!notification.device_id) { const resolvedFromTransaction = await resolveNotificationFromTransaction(notification); if (!resolvedFromTransaction) { await markNoDeviceFailure(notification); return; } const resolved = await resolveNotificationDevice(resolvedFromTransaction); if (!resolved) { await markNoDeviceFailure(notification); return; } notification = await updateNotification(notification.id, { device_id: resolved.deviceId, delivery_status: "queued", reason: undefined }); } const effectivePayload = eventPayload ?? (await resolveNotificationFromTransaction(notification)); if (!effectivePayload) { await markNoDeviceFailure(notification); return; } const mqttPayload = await buildPaymentSuccessPayload({ transaction_id: effectivePayload.transaction_id, merchant_id: effectivePayload.merchant_id, merchant_name: await getNotificationMerchantName(effectivePayload.merchant_id), device_id: notification.device_id || String(effectivePayload.device_id || ""), amount: effectivePayload.amount, currency: effectivePayload.currency, paid_at: effectivePayload.paid_at, partner_reference: effectivePayload.partner_reference, event_id: notification.event_id }); const result = await publishPaymentSuccess(mqttPayload); if (!result.ok) { await markNotificationFailed(notification, result); return; } await markNotificationSent(notification, result); } async function onTransactionPaid(event) { const payload = event.payload_json; const existing = await getNotificationByTransactionAndEvent(payload.transaction_id, event.id); if (existing) { return; } const resolved = await resolveNotificationDevice(payload); const deliveryStatus = resolveDeliveryStatus(resolved); const created = await createNotification({ transaction_id: payload.transaction_id, device_id: resolved?.deviceId || null, event_id: event.id, delivery_status: deliveryStatus, reason: resolveFailureReason(resolved), payload_json: buildNotificationPayload(payload, resolved?.deviceId || ""), ack_status: resolved ? "not_supported" : "not_needed" }); await publishNotificationNow(created, payload); } async function bootstrapNotificationForPaidTransaction(transaction) { const existing = await getNotificationByTransactionId(transaction.id); if (existing) { return; } const payload = { transaction_id: transaction.id, merchant_id: transaction.merchant_id, outlet_id: transaction.outlet_id, terminal_id: transaction.terminal_id, device_id: transaction.device_id, amount: transaction.amount, currency: transaction.currency, partner_reference: transaction.partner_reference, paid_at: transaction.paid_at }; const eventId = `bootstrap_${transaction.id}`; const resolved = await resolveNotificationDevice(payload); const deliveryStatus = resolveDeliveryStatus(resolved); const created = await createNotification({ transaction_id: transaction.id, device_id: resolved?.deviceId || null, event_id: eventId, delivery_status: deliveryStatus, reason: resolveFailureReason(resolved), payload_json: buildNotificationPayload(payload, resolved?.deviceId || ""), ack_status: resolved ? "not_supported" : "not_needed" }); await publishNotificationNow(created, payload); } async function seedPaidTransactions() { const paidTransactions = await listTransactions({ status: "paid" }); for (const tx of paidTransactions) { await bootstrapNotificationForPaidTransaction(toTransactionPayload(tx)); } } async function processRetryCycle() { const now = new Date().toISOString(); const retrying = await listNotifications({ delivery_status: "retrying" }); const due = retrying.filter((notification) => { if (!notification.next_retry_at) { return true; } return notification.next_retry_at <= now; }); for (const notification of due) { await publishNotificationNow(notification, null); } } export async function retryNotificationByTransactionId(transactionId) { const tx = await getTransactionById(transactionId); if (!tx) { return null; } if (tx.status !== "paid") { throw new Error("NOTIFICATION_PUBLISH_CONDITION"); } const notification = await getNotificationByTransactionId(transactionId); if (!notification) { return null; } if (notification.delivery_status === "acknowledged") { return toNotificationPayload(notification); } await publishNotificationNow(notification, null); const updated = await getNotificationByTransactionId(transactionId); return updated ? toNotificationPayload(updated) : null; } export function startNotificationOrchestrator() { void seedPaidTransactions(); void subscribeTransactionPaid((event) => { void onTransactionPaid(event); }); setInterval(() => { void processRetryCycle(); }, RETRY_POLL_INTERVAL_MS); }