import { randomUUID } from "node:crypto"; import { getPool } from "../db/pool"; function nowIso() { return new Date().toISOString(); } function mapMessage(row) { return { id: row.id, direction: row.direction, device_id: row.device_id, topic: row.topic, message_type: row.message_type, correlation_id: row.correlation_id || undefined, payload_json: row.payload_json || {}, publish_status: row.publish_status, reason: row.reason || undefined, created_at: row.created_at }; } export async function createMqttMessage(payload) { const { rows } = await getPool().query(`INSERT INTO mqtt_messages ( id, direction, device_id, topic, message_type, correlation_id, payload_json, publish_status, reason, created_at ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) RETURNING *`, [ `mqtt_${randomUUID()}`, payload.direction, payload.device_id, payload.topic, payload.message_type, payload.correlation_id || null, payload.payload_json || {}, payload.publish_status || "recorded", payload.reason || null, nowIso() ]); return mapMessage(rows[0]); } export async function listMqttMessages(filter) { const clauses = []; const params = []; let i = 1; if (filter?.device_id) { clauses.push(`device_id = $${i++}`); params.push(filter.device_id); } if (filter?.direction) { clauses.push(`direction = $${i++}`); params.push(filter.direction); } if (filter?.message_type) { clauses.push(`message_type = $${i++}`); params.push(filter.message_type); } if (filter?.correlation_id) { clauses.push(`correlation_id = $${i++}`); params.push(filter.correlation_id); } const limit = Math.min(Math.max(filter?.limit || 100, 1), 500); const where = clauses.length ? `WHERE ${clauses.join(" AND ")}` : ""; const { rows } = await getPool().query(`SELECT * FROM mqtt_messages ${where} ORDER BY created_at DESC LIMIT ${limit}`, params); return rows.map(mapMessage); } export function toMqttMessagePayload(message) { return { ...message }; }