Initial commit
This commit is contained in:
274
dist/shared/orchestrators/notificationOrchestrator.js
vendored
Normal file
274
dist/shared/orchestrators/notificationOrchestrator.js
vendored
Normal file
@ -0,0 +1,274 @@
|
||||
import { getActiveBindingByDevice, getActiveBindingByTerminal } from "../store/bindingStore";
|
||||
import { createNotification, getNotificationByTransactionAndEvent, getNotificationByTransactionId, listNotifications, toNotificationPayload, updateNotification } from "../store/notificationStore";
|
||||
import { getMerchantById } from "../store/merchantStore";
|
||||
import { getTransactionById, listTransactions, toTransactionPayload } from "../store/transactionStore";
|
||||
import { buildPaymentSuccessPayload, publishPaymentSuccess } from "../services/mqttPublisher";
|
||||
import { subscribeTransactionPaid } from "../events/transactionEvents";
|
||||
import { env } from "../../config/env";
|
||||
const RETRY_INTERVAL_MS = [
|
||||
env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000,
|
||||
(env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000) * 2,
|
||||
(env.MQTT_PUBLISH_DEFAULT_RETRY_INTERVAL_MS || 15000) * 4
|
||||
];
|
||||
const RETRY_POLL_INTERVAL_MS = 5000;
|
||||
const MAX_RETRY = 3;
|
||||
async function resolveNotificationDevice(payload) {
|
||||
if (payload.device_id) {
|
||||
const binding = await getActiveBindingByDevice(payload.device_id);
|
||||
if (binding && binding.terminal_id === payload.terminal_id) {
|
||||
return {
|
||||
deviceId: payload.device_id,
|
||||
source: "tx_device"
|
||||
};
|
||||
}
|
||||
}
|
||||
const terminalBinding = await getActiveBindingByTerminal(payload.terminal_id);
|
||||
if (!terminalBinding) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
deviceId: terminalBinding.device_id,
|
||||
source: "binding_device"
|
||||
};
|
||||
}
|
||||
function buildNotificationPayload(txPayload, deviceId) {
|
||||
return {
|
||||
message_type: "payment_success",
|
||||
transaction_id: txPayload.transaction_id,
|
||||
merchant_id: txPayload.merchant_id,
|
||||
terminal_id: txPayload.terminal_id,
|
||||
amount: txPayload.amount,
|
||||
currency: txPayload.currency,
|
||||
paid_at: txPayload.paid_at,
|
||||
partner_reference: txPayload.partner_reference,
|
||||
target_device_id: deviceId
|
||||
};
|
||||
}
|
||||
function resolveDeliveryStatus(bindingResolved) {
|
||||
return bindingResolved ? "queued" : "failed";
|
||||
}
|
||||
function resolveFailureReason(bindingResolved) {
|
||||
if (!bindingResolved) {
|
||||
return "NOTIFICATION_NO_ACTIVE_BINDING";
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
function makeNextRetryDate(retryCount) {
|
||||
const intervalMs = RETRY_INTERVAL_MS[Math.min(retryCount, RETRY_INTERVAL_MS.length - 1)] || 60000;
|
||||
return new Date(Date.now() + intervalMs).toISOString();
|
||||
}
|
||||
async function getNotificationMerchantName(merchantId) {
|
||||
const merchant = await getMerchantById(merchantId);
|
||||
return merchant?.brand_name || merchant?.legal_name || merchantId;
|
||||
}
|
||||
function mapMqttFailureState(retryCount, reason) {
|
||||
const nextRetryCount = retryCount + 1;
|
||||
if (nextRetryCount >= MAX_RETRY) {
|
||||
return {
|
||||
status: "failed",
|
||||
retry_count: nextRetryCount,
|
||||
reason: reason || "NOTIFICATION_RETRY_EXHAUSTED"
|
||||
};
|
||||
}
|
||||
return {
|
||||
status: "retrying",
|
||||
retry_count: nextRetryCount,
|
||||
next_retry_at: makeNextRetryDate(retryCount),
|
||||
reason
|
||||
};
|
||||
}
|
||||
async function markNotificationSent(notification, publishResult) {
|
||||
await updateNotification(notification.id, {
|
||||
delivery_status: "sent",
|
||||
retry_count: notification.retry_count,
|
||||
ack_status: "not_supported",
|
||||
sent_at: publishResult.publishedAt
|
||||
});
|
||||
}
|
||||
async function markNotificationFailed(notification, publishResult) {
|
||||
const next = mapMqttFailureState(notification.retry_count, publishResult.reason);
|
||||
await updateNotification(notification.id, {
|
||||
delivery_status: next.status,
|
||||
retry_count: next.retry_count,
|
||||
reason: next.reason,
|
||||
ack_status: "not_supported",
|
||||
next_retry_at: next.next_retry_at
|
||||
});
|
||||
}
|
||||
async function markNoDeviceFailure(notification) {
|
||||
await updateNotification(notification.id, {
|
||||
delivery_status: "failed",
|
||||
retry_count: 0,
|
||||
reason: "NOTIFICATION_NO_ACTIVE_BINDING",
|
||||
ack_status: "not_needed"
|
||||
});
|
||||
}
|
||||
async function resolveNotificationFromTransaction(notification) {
|
||||
const tx = await getTransactionById(notification.transaction_id);
|
||||
if (!tx) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
transaction_id: tx.id,
|
||||
merchant_id: tx.merchant_id,
|
||||
outlet_id: tx.outlet_id,
|
||||
terminal_id: tx.terminal_id,
|
||||
device_id: tx.device_id,
|
||||
amount: tx.amount,
|
||||
currency: tx.currency,
|
||||
paid_at: tx.paid_at,
|
||||
partner_reference: tx.partner_reference
|
||||
};
|
||||
}
|
||||
async function getMqttPayloadFromNotification(notification, paidAt) {
|
||||
return buildPaymentSuccessPayload({
|
||||
transaction_id: String(notification.payload_json.transaction_id || ""),
|
||||
merchant_id: String(notification.payload_json.merchant_id || ""),
|
||||
merchant_name: await getNotificationMerchantName(String(notification.payload_json.merchant_id || "")),
|
||||
device_id: String(notification.device_id || ""),
|
||||
amount: Number(notification.payload_json.amount || 0),
|
||||
currency: String(notification.payload_json.currency || "IDR"),
|
||||
paid_at: paidAt,
|
||||
partner_reference: String(notification.payload_json.partner_reference || ""),
|
||||
event_id: notification.event_id
|
||||
});
|
||||
}
|
||||
async function publishNotificationNow(notification, eventPayload) {
|
||||
if (!notification.device_id) {
|
||||
const resolvedFromTransaction = await resolveNotificationFromTransaction(notification);
|
||||
if (!resolvedFromTransaction) {
|
||||
await markNoDeviceFailure(notification);
|
||||
return;
|
||||
}
|
||||
const resolved = await resolveNotificationDevice(resolvedFromTransaction);
|
||||
if (!resolved) {
|
||||
await markNoDeviceFailure(notification);
|
||||
return;
|
||||
}
|
||||
notification = await updateNotification(notification.id, {
|
||||
device_id: resolved.deviceId,
|
||||
delivery_status: "queued",
|
||||
reason: undefined
|
||||
});
|
||||
}
|
||||
const effectivePayload = eventPayload ?? (await resolveNotificationFromTransaction(notification));
|
||||
if (!effectivePayload) {
|
||||
await markNoDeviceFailure(notification);
|
||||
return;
|
||||
}
|
||||
const mqttPayload = await buildPaymentSuccessPayload({
|
||||
transaction_id: effectivePayload.transaction_id,
|
||||
merchant_id: effectivePayload.merchant_id,
|
||||
merchant_name: await getNotificationMerchantName(effectivePayload.merchant_id),
|
||||
device_id: notification.device_id || String(effectivePayload.device_id || ""),
|
||||
amount: effectivePayload.amount,
|
||||
currency: effectivePayload.currency,
|
||||
paid_at: effectivePayload.paid_at,
|
||||
partner_reference: effectivePayload.partner_reference,
|
||||
event_id: notification.event_id
|
||||
});
|
||||
const result = await publishPaymentSuccess(mqttPayload);
|
||||
if (!result.ok) {
|
||||
await markNotificationFailed(notification, result);
|
||||
return;
|
||||
}
|
||||
await markNotificationSent(notification, result);
|
||||
}
|
||||
async function onTransactionPaid(event) {
|
||||
const payload = event.payload_json;
|
||||
const existing = await getNotificationByTransactionAndEvent(payload.transaction_id, event.id);
|
||||
if (existing) {
|
||||
return;
|
||||
}
|
||||
const resolved = await resolveNotificationDevice(payload);
|
||||
const deliveryStatus = resolveDeliveryStatus(resolved);
|
||||
const created = await createNotification({
|
||||
transaction_id: payload.transaction_id,
|
||||
device_id: resolved?.deviceId || null,
|
||||
event_id: event.id,
|
||||
delivery_status: deliveryStatus,
|
||||
reason: resolveFailureReason(resolved),
|
||||
payload_json: buildNotificationPayload(payload, resolved?.deviceId || ""),
|
||||
ack_status: resolved ? "not_supported" : "not_needed"
|
||||
});
|
||||
await publishNotificationNow(created, payload);
|
||||
}
|
||||
async function bootstrapNotificationForPaidTransaction(transaction) {
|
||||
const existing = await getNotificationByTransactionId(transaction.id);
|
||||
if (existing) {
|
||||
return;
|
||||
}
|
||||
const payload = {
|
||||
transaction_id: transaction.id,
|
||||
merchant_id: transaction.merchant_id,
|
||||
outlet_id: transaction.outlet_id,
|
||||
terminal_id: transaction.terminal_id,
|
||||
device_id: transaction.device_id,
|
||||
amount: transaction.amount,
|
||||
currency: transaction.currency,
|
||||
partner_reference: transaction.partner_reference,
|
||||
paid_at: transaction.paid_at
|
||||
};
|
||||
const eventId = `bootstrap_${transaction.id}`;
|
||||
const resolved = await resolveNotificationDevice(payload);
|
||||
const deliveryStatus = resolveDeliveryStatus(resolved);
|
||||
const created = await createNotification({
|
||||
transaction_id: transaction.id,
|
||||
device_id: resolved?.deviceId || null,
|
||||
event_id: eventId,
|
||||
delivery_status: deliveryStatus,
|
||||
reason: resolveFailureReason(resolved),
|
||||
payload_json: buildNotificationPayload(payload, resolved?.deviceId || ""),
|
||||
ack_status: resolved ? "not_supported" : "not_needed"
|
||||
});
|
||||
await publishNotificationNow(created, payload);
|
||||
}
|
||||
async function seedPaidTransactions() {
|
||||
const paidTransactions = await listTransactions({ status: "paid" });
|
||||
for (const tx of paidTransactions) {
|
||||
await bootstrapNotificationForPaidTransaction(toTransactionPayload(tx));
|
||||
}
|
||||
}
|
||||
async function processRetryCycle() {
|
||||
const now = new Date().toISOString();
|
||||
const retrying = await listNotifications({
|
||||
delivery_status: "retrying"
|
||||
});
|
||||
const due = retrying.filter((notification) => {
|
||||
if (!notification.next_retry_at) {
|
||||
return true;
|
||||
}
|
||||
return notification.next_retry_at <= now;
|
||||
});
|
||||
for (const notification of due) {
|
||||
await publishNotificationNow(notification, null);
|
||||
}
|
||||
}
|
||||
export async function retryNotificationByTransactionId(transactionId) {
|
||||
const tx = await getTransactionById(transactionId);
|
||||
if (!tx) {
|
||||
return null;
|
||||
}
|
||||
if (tx.status !== "paid") {
|
||||
throw new Error("NOTIFICATION_PUBLISH_CONDITION");
|
||||
}
|
||||
const notification = await getNotificationByTransactionId(transactionId);
|
||||
if (!notification) {
|
||||
return null;
|
||||
}
|
||||
if (notification.delivery_status === "acknowledged") {
|
||||
return toNotificationPayload(notification);
|
||||
}
|
||||
await publishNotificationNow(notification, null);
|
||||
const updated = await getNotificationByTransactionId(transactionId);
|
||||
return updated ? toNotificationPayload(updated) : null;
|
||||
}
|
||||
export function startNotificationOrchestrator() {
|
||||
void seedPaidTransactions();
|
||||
void subscribeTransactionPaid((event) => {
|
||||
void onTransactionPaid(event);
|
||||
});
|
||||
setInterval(() => {
|
||||
void processRetryCycle();
|
||||
}, RETRY_POLL_INTERVAL_MS);
|
||||
}
|
||||
Reference in New Issue
Block a user