Files
Qris-Soundbox/scripts/smoke-mqtt-real.mjs

425 lines
12 KiB
JavaScript

import { spawn } from "node:child_process";
import { createHmac } from "node:crypto";
import { Pool } from "pg";
import mqtt from "mqtt";
import "dotenv/config";
const PORT = process.env.PORT || "3115";
const BASE = process.env.BASE_URL || `http://127.0.0.1:${PORT}`;
const ADMIN_TOKEN = process.env.ADMIN_TOKEN || "admin-dev-token";
const DEVICE_TOKEN = process.env.DEVICE_TOKEN || "device-dev-token";
const SECRET = process.env.INTEGRATION_WEBHOOK_SECRET || "dev-callback-secret";
const BROKER_URL = process.env.MQTT_BROKER_URL;
const MQTT_USERNAME = process.env.MQTT_USERNAME;
const MQTT_PASSWORD = process.env.MQTT_PASSWORD;
const MQTT_CLIENT_ID = `${process.env.MQTT_CLIENT_ID || "qris-platform-backend"}-real-smoke-${Date.now()}`;
const created = {
merchantIds: [],
dynamicPartnerReferences: [],
mqttCorrelationIds: []
};
function short(data) {
const json = typeof data === "string" ? data : JSON.stringify(data || {});
return json.length > 220 ? `${json.slice(0, 220)}...` : json;
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function req(path, options = {}) {
const response = await fetch(`${BASE}${path}`, {
method: options.method || "GET",
headers: {
"Content-Type": "application/json",
...(options.headers || {})
},
body: Object.prototype.hasOwnProperty.call(options, "body")
? JSON.stringify(options.body)
: undefined
});
const text = await response.text();
let body = null;
try {
body = text ? JSON.parse(text) : null;
} catch {
body = text;
}
if (!response.ok) {
throw new Error(`${options._label || path} failed: ${response.status} ${short(body)}`);
}
console.log(`${options._label || `${options.method || "GET"} ${path}`} => ${response.status} ${short(body)}`);
return body?.data !== undefined ? body.data : body;
}
function reqAdmin(path, options = {}) {
return req(path, {
...options,
headers: {
...(options.headers || {}),
Authorization: `Bearer ${ADMIN_TOKEN}`
}
});
}
function reqDevice(path, options = {}) {
return req(path, {
...options,
headers: {
...(options.headers || {}),
Authorization: `Bearer ${DEVICE_TOKEN}`
}
});
}
async function waitForHealth() {
for (let i = 0; i < 100; i += 1) {
try {
await req("/health", { _label: "GET /health" });
return;
} catch {
await sleep(200);
}
}
throw new Error("server health timeout");
}
function waitForMqtt(client) {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("MQTT_CONNECT_TIMEOUT")), 10000);
client.once("connect", () => {
clearTimeout(timer);
resolve();
});
client.once("error", (error) => {
clearTimeout(timer);
reject(error);
});
});
}
function subscribe(client, topic) {
return new Promise((resolve, reject) => {
client.subscribe(topic, { qos: 1 }, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
function waitForMessage(messages, predicate, label) {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error(`${label} message timeout`)), 12000);
const interval = setInterval(() => {
const found = messages.find(predicate);
if (found) {
clearInterval(interval);
clearTimeout(timer);
resolve(found);
}
}, 100);
});
}
async function cleanup() {
const pool = new Pool(
process.env.DATABASE_URL
? { connectionString: process.env.DATABASE_URL }
: {
host: process.env.PGHOST || "127.0.0.1",
port: Number(process.env.PGPORT || 5432),
user: process.env.PGUSER || "postgres",
password: process.env.PGPASSWORD || "postgres",
database: process.env.PGDATABASE || "qris_soundbox_platform"
}
);
try {
const dynamic = await pool.query(
"DELETE FROM transactions WHERE partner_reference = ANY($1::text[]) RETURNING id",
[created.dynamicPartnerReferences]
);
const merchants = await pool.query(
"DELETE FROM merchants WHERE id = ANY($1::text[]) OR legal_name LIKE 'Real MQTT Smoke Merchant %' RETURNING id",
[created.merchantIds]
);
const mqttMessages = await pool.query(
"DELETE FROM mqtt_messages WHERE correlation_id = ANY($1::text[]) OR payload_json::text LIKE '%real-mqtt-smoke-%' RETURNING id",
[created.mqttCorrelationIds]
);
console.log(
`cleanup => dynamic_transactions=${dynamic.rowCount} merchants=${merchants.rowCount} mqtt_messages=${mqttMessages.rowCount}`
);
} finally {
await pool.end();
}
}
async function createMerchantBundle(ts, suffix, terminalMode = "static", deviceMode = "mqtt", capability = {}) {
const merchant = await reqAdmin("/admin/merchants", {
method: "POST",
body: {
legal_name: `Real MQTT Smoke Merchant ${suffix} ${ts}`,
brand_name: `RMS-${suffix}-${ts}`,
settlement_account_reference: `bank:${suffix}:${ts}`,
settlement_account_type: "merchant_bank_account",
payout_mode: "merchant_direct"
},
_label: `POST /admin/merchants ${suffix}`
});
created.merchantIds.push(merchant.id);
const outlet = await reqAdmin(`/admin/merchants/${merchant.id}/outlets`, {
method: "POST",
body: { name: `Real MQTT Smoke Outlet ${suffix} ${ts}` },
_label: `POST /admin/merchants/:id/outlets ${suffix}`
});
const terminal = await reqAdmin(`/admin/outlets/${outlet.id}/terminals`, {
method: "POST",
body: { terminal_code: `RMS-${suffix}-TERM-${ts}`, qr_mode: terminalMode },
_label: `POST /admin/outlets/:id/terminals ${suffix}`
});
const device = await reqAdmin("/admin/devices", {
method: "POST",
body: {
device_code: `RMS-${suffix}-DEV-${ts}`,
vendor: "smoke",
model: "mqtt-real",
communication_mode: deviceMode,
status: "active",
capability_profile_json: capability
},
_label: `POST /admin/devices ${suffix}`
});
await reqAdmin(`/admin/devices/${device.id}/bind`, {
method: "POST",
body: {
merchant_id: merchant.id,
outlet_id: outlet.id,
terminal_id: terminal.id
},
_label: `POST /admin/devices/:id/bind ${suffix}`
});
return { merchant, outlet, terminal, device };
}
async function main() {
if (!BROKER_URL || !MQTT_USERNAME || !MQTT_PASSWORD) {
throw new Error("MQTT_BROKER_URL, MQTT_USERNAME, and MQTT_PASSWORD are required");
}
const server = spawn("npm", ["start"], {
cwd: process.cwd(),
env: {
...process.env,
PORT,
MQTT_PUBLISH_MODE: "broker"
},
stdio: ["ignore", "pipe", "pipe"]
});
const serverLog = [];
server.stdout.on("data", (chunk) => serverLog.push(chunk.toString()));
server.stderr.on("data", (chunk) => serverLog.push(chunk.toString()));
const client = mqtt.connect(BROKER_URL, {
clientId: MQTT_CLIENT_ID,
username: MQTT_USERNAME,
password: MQTT_PASSWORD,
connectTimeout: 10000,
reconnectPeriod: 0,
clean: true
});
const messages = [];
client.on("message", (topic, payload) => {
let parsed;
try {
parsed = JSON.parse(payload.toString());
} catch {
parsed = payload.toString();
}
messages.push({ topic, payload: parsed });
});
try {
await Promise.all([waitForHealth(), waitForMqtt(client)]);
await subscribe(client, "devices/+/downlink/#");
console.log(`MQTT subscribe => devices/+/downlink/#`);
const ts = Date.now();
const staticBundle = await createMerchantBundle(ts, "PAY", "static", "mqtt", {
flows: ["static_payment_notification"]
});
await reqDevice("/device/heartbeat", {
method: "POST",
body: {
device_id: staticBundle.device.id,
timestamp: new Date().toISOString(),
network_strength: 88,
battery_level: 77,
state: "idle"
},
_label: "POST /device/heartbeat payment"
});
const partnerReference = `REAL-MQTT-SMOKE-PR-${ts}`;
const paymentTx = await reqAdmin("/admin/transactions", {
method: "POST",
body: {
partner_reference: partnerReference,
merchant_id: staticBundle.merchant.id,
outlet_id: staticBundle.outlet.id,
terminal_id: staticBundle.terminal.id,
device_id: staticBundle.device.id,
amount: 32100,
currency: "IDR",
qr_mode: "static",
initiation_mode: "static",
status: "initiated"
},
_label: "POST /admin/transactions payment"
});
const callback = {
partner_reference: partnerReference,
partner_txn_id: `REAL-MQTT-SMOKE-PTX-${ts}`,
amount: 32100,
currency: "IDR",
payment_status: "paid",
status: "paid",
paid_at: new Date().toISOString()
};
const signature = createHmac("sha256", SECRET).update(JSON.stringify(callback)).digest("hex");
await req("/integrations/qris/callback", {
method: "POST",
headers: { "X-Partner-Signature": signature },
body: { ...callback, signature },
_label: "POST /integrations/qris/callback payment"
});
const paymentMessage = await waitForMessage(
messages,
(item) =>
item.topic === `devices/${staticBundle.device.id}/downlink/payment/success` &&
item.payload?.message_type === "payment_success" &&
item.payload?.transaction_id === paymentTx.id,
"payment success"
);
const apiBundle = await createMerchantBundle(ts, "CFG", "dynamic_api", "api", {
features: { dynamic_qr: { api_direct: true } },
flows: ["dynamic_qr:api_direct"]
});
const configVersion = Math.floor(Date.now() / 1000);
await reqAdmin(`/admin/devices/${apiBundle.device.id}/config`, {
method: "PATCH",
body: {
config_version: configVersion,
settings: {
volume: 62,
language: "id-ID",
heartbeat_interval_seconds: 45,
test_marker: `real-mqtt-smoke-${configVersion}`
}
},
_label: "PATCH /admin/devices/:id/config"
});
const configMessage = await waitForMessage(
messages,
(item) =>
item.topic === `devices/${apiBundle.device.id}/downlink/config/push` &&
item.payload?.message_type === "config_push" &&
item.payload?.config_version === configVersion,
"config push"
);
const mqttBundle = await createMerchantBundle(ts, "DYN", "dynamic_mqtt", "mqtt", {
features: { dynamic_qr: { mqtt: true } },
flows: ["dynamic_qr:mqtt"]
});
const requestId = `REAL-MQTT-SMOKE-DYN-${ts}`;
created.mqttCorrelationIds.push(requestId);
await reqDevice("/device/mqtt/uplink/dynamic-qr/request", {
method: "POST",
body: {
message_type: "dynamic_qr_request",
device_id: mqttBundle.device.id,
terminal_id: mqttBundle.terminal.id,
amount: 12345,
currency: "IDR",
request_id: requestId
},
_label: "POST /device/mqtt/uplink/dynamic-qr/request"
});
created.dynamicPartnerReferences.push(`DYN-${requestId}`);
const dynamicMessage = await waitForMessage(
messages,
(item) =>
item.topic === `devices/${mqttBundle.device.id}/downlink/dynamic-qr/response` &&
item.payload?.message_type === "dynamic_qr_response" &&
item.payload?.correlation_id === requestId,
"dynamic QR response"
);
console.log(
JSON.stringify(
{
broker_connect: "ok",
subscribed_topic: "devices/+/downlink/#",
payment_success: {
topic: paymentMessage.topic,
transaction_id: paymentMessage.payload.transaction_id,
amount: paymentMessage.payload.amount
},
config_push: {
topic: configMessage.topic,
config_version: configMessage.payload.config_version
},
dynamic_qr_response: {
topic: dynamicMessage.topic,
transaction_id: dynamicMessage.payload.transaction_id,
correlation_id: dynamicMessage.payload.correlation_id
},
received_messages_count: messages.length
},
null,
2
)
);
} finally {
client.end(true);
server.kill("SIGTERM");
await sleep(500);
await cleanup();
if (server.exitCode === null) {
server.kill("SIGKILL");
}
if (serverLog.length) {
process.env.SMOKE_MQTT_REAL_DEBUG === "true" && console.log(serverLog.join(""));
}
}
process.exit(0);
}
main().catch(async (error) => {
console.error(error instanceof Error ? error.message : error);
await cleanup().catch(() => undefined);
process.exit(1);
});