import { spawn } from "node:child_process"; import { createHmac } from "node:crypto"; import { Pool } from "pg"; import mqtt from "mqtt"; import "dotenv/config"; const PORT = process.env.PORT || "3115"; const BASE = process.env.BASE_URL || `http://127.0.0.1:${PORT}`; const ADMIN_TOKEN = process.env.ADMIN_TOKEN || "admin-dev-token"; const DEVICE_TOKEN = process.env.DEVICE_TOKEN || "device-dev-token"; const SECRET = process.env.INTEGRATION_WEBHOOK_SECRET || "dev-callback-secret"; const BROKER_URL = process.env.MQTT_BROKER_URL; const MQTT_USERNAME = process.env.MQTT_USERNAME; const MQTT_PASSWORD = process.env.MQTT_PASSWORD; const MQTT_CLIENT_ID = `${process.env.MQTT_CLIENT_ID || "qris-platform-backend"}-real-smoke-${Date.now()}`; const created = { merchantIds: [], dynamicPartnerReferences: [], mqttCorrelationIds: [] }; function short(data) { const json = typeof data === "string" ? data : JSON.stringify(data || {}); return json.length > 220 ? `${json.slice(0, 220)}...` : json; } function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } async function req(path, options = {}) { const response = await fetch(`${BASE}${path}`, { method: options.method || "GET", headers: { "Content-Type": "application/json", ...(options.headers || {}) }, body: Object.prototype.hasOwnProperty.call(options, "body") ? JSON.stringify(options.body) : undefined }); const text = await response.text(); let body = null; try { body = text ? JSON.parse(text) : null; } catch { body = text; } if (!response.ok) { throw new Error(`${options._label || path} failed: ${response.status} ${short(body)}`); } console.log(`${options._label || `${options.method || "GET"} ${path}`} => ${response.status} ${short(body)}`); return body?.data !== undefined ? body.data : body; } function reqAdmin(path, options = {}) { return req(path, { ...options, headers: { ...(options.headers || {}), Authorization: `Bearer ${ADMIN_TOKEN}` } }); } function reqDevice(path, options = {}) { return req(path, { ...options, headers: { ...(options.headers || {}), Authorization: `Bearer ${DEVICE_TOKEN}` } }); } async function waitForHealth() { for (let i = 0; i < 100; i += 1) { try { await req("/health", { _label: "GET /health" }); return; } catch { await sleep(200); } } throw new Error("server health timeout"); } function waitForMqtt(client) { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error("MQTT_CONNECT_TIMEOUT")), 10000); client.once("connect", () => { clearTimeout(timer); resolve(); }); client.once("error", (error) => { clearTimeout(timer); reject(error); }); }); } function subscribe(client, topic) { return new Promise((resolve, reject) => { client.subscribe(topic, { qos: 1 }, (error) => { if (error) { reject(error); return; } resolve(); }); }); } function waitForMessage(messages, predicate, label) { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error(`${label} message timeout`)), 12000); const interval = setInterval(() => { const found = messages.find(predicate); if (found) { clearInterval(interval); clearTimeout(timer); resolve(found); } }, 100); }); } async function cleanup() { const pool = new Pool( process.env.DATABASE_URL ? { connectionString: process.env.DATABASE_URL } : { host: process.env.PGHOST || "127.0.0.1", port: Number(process.env.PGPORT || 5432), user: process.env.PGUSER || "postgres", password: process.env.PGPASSWORD || "postgres", database: process.env.PGDATABASE || "qris_soundbox_platform" } ); try { const dynamic = await pool.query( "DELETE FROM transactions WHERE partner_reference = ANY($1::text[]) RETURNING id", [created.dynamicPartnerReferences] ); const merchants = await pool.query( "DELETE FROM merchants WHERE id = ANY($1::text[]) OR legal_name LIKE 'Real MQTT Smoke Merchant %' RETURNING id", [created.merchantIds] ); const mqttMessages = await pool.query( "DELETE FROM mqtt_messages WHERE correlation_id = ANY($1::text[]) OR payload_json::text LIKE '%real-mqtt-smoke-%' RETURNING id", [created.mqttCorrelationIds] ); console.log( `cleanup => dynamic_transactions=${dynamic.rowCount} merchants=${merchants.rowCount} mqtt_messages=${mqttMessages.rowCount}` ); } finally { await pool.end(); } } async function createMerchantBundle(ts, suffix, terminalMode = "static", deviceMode = "mqtt", capability = {}) { const merchant = await reqAdmin("/admin/merchants", { method: "POST", body: { legal_name: `Real MQTT Smoke Merchant ${suffix} ${ts}`, brand_name: `RMS-${suffix}-${ts}`, settlement_account_reference: `bank:${suffix}:${ts}`, settlement_account_type: "merchant_bank_account", payout_mode: "merchant_direct" }, _label: `POST /admin/merchants ${suffix}` }); created.merchantIds.push(merchant.id); const outlet = await reqAdmin(`/admin/merchants/${merchant.id}/outlets`, { method: "POST", body: { name: `Real MQTT Smoke Outlet ${suffix} ${ts}` }, _label: `POST /admin/merchants/:id/outlets ${suffix}` }); const terminal = await reqAdmin(`/admin/outlets/${outlet.id}/terminals`, { method: "POST", body: { terminal_code: `RMS-${suffix}-TERM-${ts}`, qr_mode: terminalMode }, _label: `POST /admin/outlets/:id/terminals ${suffix}` }); const device = await reqAdmin("/admin/devices", { method: "POST", body: { device_code: `RMS-${suffix}-DEV-${ts}`, vendor: "smoke", model: "mqtt-real", communication_mode: deviceMode, status: "active", capability_profile_json: capability }, _label: `POST /admin/devices ${suffix}` }); await reqAdmin(`/admin/devices/${device.id}/bind`, { method: "POST", body: { merchant_id: merchant.id, outlet_id: outlet.id, terminal_id: terminal.id }, _label: `POST /admin/devices/:id/bind ${suffix}` }); return { merchant, outlet, terminal, device }; } async function main() { if (!BROKER_URL || !MQTT_USERNAME || !MQTT_PASSWORD) { throw new Error("MQTT_BROKER_URL, MQTT_USERNAME, and MQTT_PASSWORD are required"); } const server = spawn("npm", ["start"], { cwd: process.cwd(), env: { ...process.env, PORT, MQTT_PUBLISH_MODE: "broker" }, stdio: ["ignore", "pipe", "pipe"] }); const serverLog = []; server.stdout.on("data", (chunk) => serverLog.push(chunk.toString())); server.stderr.on("data", (chunk) => serverLog.push(chunk.toString())); const client = mqtt.connect(BROKER_URL, { clientId: MQTT_CLIENT_ID, username: MQTT_USERNAME, password: MQTT_PASSWORD, connectTimeout: 10000, reconnectPeriod: 0, clean: true }); const messages = []; client.on("message", (topic, payload) => { let parsed; try { parsed = JSON.parse(payload.toString()); } catch { parsed = payload.toString(); } messages.push({ topic, payload: parsed }); }); try { await Promise.all([waitForHealth(), waitForMqtt(client)]); await subscribe(client, "devices/+/downlink/#"); console.log(`MQTT subscribe => devices/+/downlink/#`); const ts = Date.now(); const staticBundle = await createMerchantBundle(ts, "PAY", "static", "mqtt", { flows: ["static_payment_notification"] }); await reqDevice("/device/heartbeat", { method: "POST", body: { device_id: staticBundle.device.id, timestamp: new Date().toISOString(), network_strength: 88, battery_level: 77, state: "idle" }, _label: "POST /device/heartbeat payment" }); const partnerReference = `REAL-MQTT-SMOKE-PR-${ts}`; const paymentTx = await reqAdmin("/admin/transactions", { method: "POST", body: { partner_reference: partnerReference, merchant_id: staticBundle.merchant.id, outlet_id: staticBundle.outlet.id, terminal_id: staticBundle.terminal.id, device_id: staticBundle.device.id, amount: 32100, currency: "IDR", qr_mode: "static", initiation_mode: "static", status: "initiated" }, _label: "POST /admin/transactions payment" }); const callback = { partner_reference: partnerReference, partner_txn_id: `REAL-MQTT-SMOKE-PTX-${ts}`, amount: 32100, currency: "IDR", payment_status: "paid", status: "paid", paid_at: new Date().toISOString() }; const signature = createHmac("sha256", SECRET).update(JSON.stringify(callback)).digest("hex"); await req("/integrations/qris/callback", { method: "POST", headers: { "X-Partner-Signature": signature }, body: { ...callback, signature }, _label: "POST /integrations/qris/callback payment" }); const paymentMessage = await waitForMessage( messages, (item) => item.topic === `devices/${staticBundle.device.id}/downlink/payment/success` && item.payload?.message_type === "payment_success" && item.payload?.transaction_id === paymentTx.id, "payment success" ); const apiBundle = await createMerchantBundle(ts, "CFG", "dynamic_api", "api", { features: { dynamic_qr: { api_direct: true } }, flows: ["dynamic_qr:api_direct"] }); const configVersion = Math.floor(Date.now() / 1000); await reqAdmin(`/admin/devices/${apiBundle.device.id}/config`, { method: "PATCH", body: { config_version: configVersion, settings: { volume: 62, language: "id-ID", heartbeat_interval_seconds: 45, test_marker: `real-mqtt-smoke-${configVersion}` } }, _label: "PATCH /admin/devices/:id/config" }); const configMessage = await waitForMessage( messages, (item) => item.topic === `devices/${apiBundle.device.id}/downlink/config/push` && item.payload?.message_type === "config_push" && item.payload?.config_version === configVersion, "config push" ); const mqttBundle = await createMerchantBundle(ts, "DYN", "dynamic_mqtt", "mqtt", { features: { dynamic_qr: { mqtt: true } }, flows: ["dynamic_qr:mqtt"] }); const requestId = `REAL-MQTT-SMOKE-DYN-${ts}`; created.mqttCorrelationIds.push(requestId); await reqDevice("/device/mqtt/uplink/dynamic-qr/request", { method: "POST", body: { message_type: "dynamic_qr_request", device_id: mqttBundle.device.id, terminal_id: mqttBundle.terminal.id, amount: 12345, currency: "IDR", request_id: requestId }, _label: "POST /device/mqtt/uplink/dynamic-qr/request" }); created.dynamicPartnerReferences.push(`DYN-${requestId}`); const dynamicMessage = await waitForMessage( messages, (item) => item.topic === `devices/${mqttBundle.device.id}/downlink/dynamic-qr/response` && item.payload?.message_type === "dynamic_qr_response" && item.payload?.correlation_id === requestId, "dynamic QR response" ); console.log( JSON.stringify( { broker_connect: "ok", subscribed_topic: "devices/+/downlink/#", payment_success: { topic: paymentMessage.topic, transaction_id: paymentMessage.payload.transaction_id, amount: paymentMessage.payload.amount }, config_push: { topic: configMessage.topic, config_version: configMessage.payload.config_version }, dynamic_qr_response: { topic: dynamicMessage.topic, transaction_id: dynamicMessage.payload.transaction_id, correlation_id: dynamicMessage.payload.correlation_id }, received_messages_count: messages.length }, null, 2 ) ); } finally { client.end(true); server.kill("SIGTERM"); await sleep(500); await cleanup(); if (server.exitCode === null) { server.kill("SIGKILL"); } if (serverLog.length) { process.env.SMOKE_MQTT_REAL_DEBUG === "true" && console.log(serverLog.join("")); } } process.exit(0); } main().catch(async (error) => { console.error(error instanceof Error ? error.message : error); await cleanup().catch(() => undefined); process.exit(1); });