chore: initial project import
Some checks failed
CI - Production Readiness / Verify (push) Has been cancelled

This commit is contained in:
Wira Basalamah
2026-04-21 09:29:29 +07:00
commit adde003fba
222 changed files with 37657 additions and 0 deletions

View File

@ -0,0 +1,685 @@
import { CampaignAudienceType, CampaignStatus, DeliveryStatus, OptInStatus, type Prisma } from "@prisma/client";
import { DEFAULT_MAX_SEND_ATTEMPTS, getRetryDelaySeconds, recalculateCampaignTotals, renderCampaignMessage } from "@/lib/campaign-utils";
import { writeAuditTrail } from "@/lib/audit";
import { prisma } from "@/lib/prisma";
import { sendOutboundTextMessage } from "@/lib/whatsapp-provider";
import { notifyCampaignRetryFailure } from "@/lib/job-alerts";
const DEFAULT_RETRIES_PER_RUN = 100;
const DEFAULT_CAMPAIGNS_PER_RUN = 20;
const DEFAULT_LOCK_TTL_SECONDS = 300;
const CAMPAIGN_RETRY_JOB_NAME = "campaign-retry-worker";
type CampaignAudienceContact = {
id: string;
fullName: string;
phoneNumber: string;
};
type CampaignRecipientWithContact = Prisma.CampaignRecipientGetPayload<{
include: { contact: { select: { id: true; fullName: true; phoneNumber: true } } };
}>;
type CampaignDispatchCampaignSeed = Prisma.BroadcastCampaignGetPayload<{ include: { channel: true; template: true } }>;
type CampaignDispatchResult = {
campaignId: string;
campaignName: string;
skippedReason: string | null;
seededRecipients: number;
processableRecipients: number;
attempted: number;
successful: number;
failed: number;
idle: boolean;
};
type CampaignRetryBatchOptions = {
tenantId?: string;
campaignId?: string;
actorUserId?: string | null;
actorIpAddress?: string | null;
actorUserAgent?: string | null;
now?: Date;
recipientBatchSize?: number;
maxCampaigns?: number;
};
type CampaignRetryBatchResult = {
runAt: string;
status: "completed" | "skipped_locked" | "failed";
processedCampaigns: number;
totalSeeded: number;
totalProcessable: number;
totalAttempted: number;
totalSuccessful: number;
totalFailed: number;
summaries: CampaignDispatchResult[];
error?: string;
};
export async function loadCampaignAudienceContacts(tenantId: string, campaign: {
audienceType: CampaignAudienceType;
segmentId: string | null;
}) {
if (campaign.audienceType === CampaignAudienceType.SEGMENT && campaign.segmentId) {
const segmentMembers = await prisma.segmentMember.findMany({
where: { segmentId: campaign.segmentId, tenantId },
include: { contact: { select: { id: true, fullName: true, phoneNumber: true } } }
});
return segmentMembers
.filter((member) => member.contact.phoneNumber.trim().length > 0)
.map((member) => ({
id: member.contact.id,
fullName: member.contact.fullName,
phoneNumber: member.contact.phoneNumber
})) as CampaignAudienceContact[];
}
const contacts = await prisma.contact.findMany({
where: {
tenantId,
optInStatus: {
not: OptInStatus.OPTED_OUT
}
},
orderBy: { fullName: "asc" },
select: { id: true, fullName: true, phoneNumber: true }
});
return contacts
.filter((contact) => contact.phoneNumber.trim().length > 0)
.map((contact) => ({ id: contact.id, fullName: contact.fullName, phoneNumber: contact.phoneNumber }));
}
function isRecipientDueForRetry(recipient: CampaignRecipientWithContact, now: Date) {
if (recipient.sendAttempts >= recipient.maxSendAttempts) {
return false;
}
if (!recipient.nextRetryAt) {
return true;
}
return recipient.nextRetryAt <= now;
}
function mapRecipientTemplateValues(contact: CampaignAudienceContact) {
return {
"1": contact.fullName,
"2": contact.phoneNumber
};
}
function getCampaignBackoffDate(now: Date, attempt: number) {
const delayInSeconds = getRetryDelaySeconds(attempt);
return new Date(now.getTime() + delayInSeconds * 1000);
}
function getBatchSize(raw: number | undefined, fallback: number) {
if (!raw || raw <= 0) {
return fallback;
}
if (!Number.isInteger(raw)) {
return fallback;
}
return raw;
}
function getPositiveIntFromEnv(key: string, fallback: number) {
const raw = process.env[key]?.trim();
const value = raw ? Number(raw) : fallback;
if (!Number.isInteger(value) || value <= 0) {
return fallback;
}
return value;
}
function safeStringify(value: unknown) {
try {
return JSON.stringify(value);
} catch {
return null;
}
}
async function upsertCampaignRecipientsSeed(campaign: CampaignDispatchCampaignSeed, tenantId: string) {
const audienceContacts = await loadCampaignAudienceContacts(tenantId, {
audienceType: campaign.audienceType,
segmentId: campaign.segmentId
});
const uniqueContacts = new Map<string, CampaignAudienceContact>();
for (const item of audienceContacts) {
if (!uniqueContacts.has(item.id) && item.phoneNumber) {
uniqueContacts.set(item.id, item);
}
}
return {
uniqueContacts,
payload: Array.from(uniqueContacts.values()).map((contact) => ({
tenantId,
campaignId: campaign.id,
contactId: contact.id,
phoneNumber: contact.phoneNumber,
sendStatus: DeliveryStatus.QUEUED,
sendAttempts: 0,
maxSendAttempts: DEFAULT_MAX_SEND_ATTEMPTS
}))
};
}
export async function dispatchCampaignById(params: {
campaignId: string;
tenantId: string;
actorUserId?: string | null;
actorIpAddress?: string | null;
actorUserAgent?: string | null;
maxRecipientsPerCampaign?: number;
now?: Date;
enforceSchedule?: boolean;
source?: "manual" | "scheduler";
}): Promise<CampaignDispatchResult> {
const now = params.now ?? new Date();
const limit = getBatchSize(params.maxRecipientsPerCampaign, DEFAULT_RETRIES_PER_RUN);
const campaign = await prisma.broadcastCampaign.findFirst({
where: { id: params.campaignId, tenantId: params.tenantId },
include: {
channel: true,
template: true
}
});
if (!campaign) {
return {
campaignId: params.campaignId,
campaignName: "-",
skippedReason: "campaign_not_found",
seededRecipients: 0,
processableRecipients: 0,
attempted: 0,
successful: 0,
failed: 0,
idle: true
};
}
if (campaign.status === CampaignStatus.CANCELED) {
return {
campaignId: campaign.id,
campaignName: campaign.name,
skippedReason: "campaign_not_ready",
seededRecipients: 0,
processableRecipients: 0,
attempted: 0,
successful: 0,
failed: 0,
idle: true
};
}
if (params.enforceSchedule && campaign.scheduledAt && campaign.scheduledAt > now) {
return {
campaignId: campaign.id,
campaignName: campaign.name,
skippedReason: "scheduled_not_ready",
seededRecipients: 0,
processableRecipients: 0,
attempted: 0,
successful: 0,
failed: 0,
idle: true
};
}
const campaignStatusSeed = campaign.scheduledAt && campaign.scheduledAt <= now
? CampaignStatus.PROCESSING
: campaign.status;
if (campaignStatusSeed !== campaign.status) {
await prisma.broadcastCampaign.update({
where: { id: campaign.id },
data: { status: campaignStatusSeed, startedAt: now }
});
campaign.status = campaignStatusSeed;
}
const existingRecipientCount = await prisma.campaignRecipient.count({
where: { campaignId: campaign.id }
});
const auditContext = {
tenantId: params.tenantId,
actorUserId: params.actorUserId,
ipAddress: params.actorIpAddress,
userAgent: params.actorUserAgent
};
let seededRecipients = 0;
if (existingRecipientCount === 0) {
const { uniqueContacts, payload } = await upsertCampaignRecipientsSeed(campaign, params.tenantId);
if (payload.length === 0) {
return {
campaignId: campaign.id,
campaignName: campaign.name,
skippedReason: "no_recipients",
seededRecipients: 0,
processableRecipients: 0,
attempted: 0,
successful: 0,
failed: 0,
idle: true
};
}
await prisma.$transaction([
prisma.campaignRecipient.createMany({ data: payload }),
prisma.broadcastCampaign.update({
where: { id: campaign.id },
data: {
totalRecipients: payload.length,
totalSent: 0,
totalDelivered: 0,
totalRead: 0,
totalFailed: 0,
status: campaignStatusSeed,
startedAt: now
}
})
]);
await writeAuditTrail({
tenantId: params.tenantId,
actorUserId: params.actorUserId,
entityType: "campaign",
entityId: campaign.id,
action: "campaign_recipients_seeded",
metadata: {
totalRecipients: payload.length,
audienceType: campaign.audienceType,
uniqueContactCount: uniqueContacts.size
},
ipAddress: params.actorIpAddress,
userAgent: params.actorUserAgent
});
seededRecipients = payload.length;
}
const retryCandidates = await prisma.campaignRecipient.findMany({
where: {
campaignId: campaign.id,
sendStatus: { in: [DeliveryStatus.QUEUED, DeliveryStatus.FAILED] },
OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: now } }]
},
include: { contact: { select: { id: true, fullName: true, phoneNumber: true } } },
orderBy: { createdAt: "asc" },
take: limit
});
const processableRecipients = retryCandidates.filter((recipient) => isRecipientDueForRetry(recipient, now));
if (processableRecipients.length === 0) {
await recalculateCampaignTotals(campaign.id);
return {
campaignId: campaign.id,
campaignName: campaign.name,
skippedReason: null,
seededRecipients,
processableRecipients: 0,
attempted: 0,
successful: 0,
failed: 0,
idle: true
};
}
let success = 0;
let failed = 0;
await writeAuditTrail({
...auditContext,
entityType: "campaign",
entityId: campaign.id,
action: "campaign_dispatch_started",
metadata: { source: params.source ?? "manual", campaignId: campaign.id, candidateCount: processableRecipients.length }
});
for (const recipient of processableRecipients) {
const attemptAt = new Date();
let outboundFailureReason: string | null = null;
let outboundProvider = "unknown";
let providerMessageId: string | null = recipient.providerMessageId;
let isSuccess = false;
try {
const renderedBody = renderCampaignMessage(
campaign.template.bodyText,
mapRecipientTemplateValues({ id: recipient.contact.id, fullName: recipient.contact.fullName, phoneNumber: recipient.contact.phoneNumber })
);
const outbound = await sendOutboundTextMessage({
tenantId: params.tenantId,
channelId: campaign.channel.id,
channelProvider: campaign.channel.provider,
phoneNumberId: campaign.channel.phoneNumberId,
to: recipient.phoneNumber,
content: renderedBody,
messageId: recipient.id
});
outboundFailureReason = outbound.failureReason ?? null;
outboundProvider = outbound.provider;
providerMessageId = outbound.providerMessageId ?? providerMessageId;
isSuccess = outbound.success;
} catch (error) {
const reason = error instanceof Error ? error.message : "Unknown send error";
outboundFailureReason = reason;
isSuccess = false;
}
const attempt = recipient.sendAttempts + 1;
const hasRetry = attempt < recipient.maxSendAttempts && !isSuccess;
const nextRetryAt = hasRetry ? getCampaignBackoffDate(attemptAt, attempt) : null;
await prisma.campaignRecipient.update({
where: { id: recipient.id },
data: {
sendAttempts: attempt,
lastAttemptAt: attemptAt,
failureReason: outboundFailureReason,
providerMessageId,
sendStatus: isSuccess
? DeliveryStatus.SENT
: hasRetry
? DeliveryStatus.QUEUED
: DeliveryStatus.FAILED,
sentAt: isSuccess && !recipient.sentAt ? attemptAt : recipient.sentAt,
nextRetryAt
}
});
await writeAuditTrail({
...auditContext,
entityType: "campaign_recipient",
entityId: recipient.id,
action: isSuccess ? "campaign_recipient_send_success" : "campaign_recipient_send_failed",
metadata: {
campaignId: campaign.id,
contactId: recipient.contactId,
attempt,
maxAttempts: recipient.maxSendAttempts,
provider: outboundProvider,
providerMessageId,
failureReason: outboundFailureReason ?? null,
retryAfter: nextRetryAt ? nextRetryAt.toISOString() : null,
source: params.source ?? "manual"
},
ipAddress: params.actorIpAddress,
userAgent: params.actorUserAgent
});
if (isSuccess) {
success += 1;
} else {
failed += 1;
}
}
await recalculateCampaignTotals(campaign.id);
await writeAuditTrail({
...auditContext,
entityType: "campaign",
entityId: campaign.id,
action: "campaign_dispatch_completed",
metadata: {
source: params.source ?? "manual",
candidateCount: processableRecipients.length,
success,
failed,
allSuccessful: failed === 0
}
});
return {
campaignId: campaign.id,
campaignName: campaign.name,
skippedReason: null,
seededRecipients,
processableRecipients: processableRecipients.length,
attempted: processableRecipients.length,
successful: success,
failed,
idle: false
};
}
async function acquireCampaignRetryLock(runId: string, ttlSeconds: number) {
const now = new Date();
const lockUntil = new Date(now.getTime() + ttlSeconds * 1000);
try {
await prisma.backgroundJobState.create({
data: {
jobName: CAMPAIGN_RETRY_JOB_NAME,
lockedBy: runId,
lockedUntil: lockUntil,
runs: 1,
lastRunStartedAt: now,
lastRunStatus: "running"
}
});
return lockUntil;
} catch {
const updated = await prisma.backgroundJobState.updateMany({
where: {
jobName: CAMPAIGN_RETRY_JOB_NAME,
OR: [{ lockedUntil: null }, { lockedUntil: { lte: now } }]
},
data: {
lockedBy: runId,
lockedUntil: lockUntil,
lastRunStartedAt: now,
lastRunStatus: "running",
lastRunCompletedAt: null,
lastError: null,
runs: { increment: 1 }
}
});
if (updated.count !== 1) {
return null;
}
return lockUntil;
}
}
async function releaseCampaignRetryLock(runId: string, summary: CampaignRetryBatchResult) {
await prisma.backgroundJobState.updateMany({
where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId },
data: {
lockedUntil: null,
lastRunCompletedAt: new Date(summary.runAt),
lastRunStatus: summary.status,
lastRunSummaryJson: safeStringify(summary),
consecutiveFailures: 0,
lastFailureAt: null
}
});
}
async function heartbeatCampaignRetryLock(runId: string, lockTtlSeconds: number) {
const lockUntil = new Date(Date.now() + lockTtlSeconds * 1000);
await prisma.backgroundJobState.updateMany({
where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId },
data: { lockedUntil: lockUntil }
});
return lockUntil;
}
export async function runCampaignRetryBatch(params: CampaignRetryBatchOptions): Promise<CampaignRetryBatchResult> {
const now = params.now ?? new Date();
const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const recipientBatchSize = getBatchSize(params.recipientBatchSize, getPositiveIntFromEnv("CAMPAIGN_RETRY_BATCH_SIZE", DEFAULT_RETRIES_PER_RUN));
const maxCampaigns = getBatchSize(params.maxCampaigns, getPositiveIntFromEnv("CAMPAIGN_RETRY_MAX_CAMPAIGNS", DEFAULT_CAMPAIGNS_PER_RUN));
const lockTtlSeconds = getPositiveIntFromEnv("CAMPAIGN_RETRY_JOB_LOCK_TTL_SECONDS", DEFAULT_LOCK_TTL_SECONDS);
const lockUntil = await acquireCampaignRetryLock(runId, lockTtlSeconds);
if (!lockUntil) {
return {
runAt: new Date().toISOString(),
status: "skipped_locked",
processedCampaigns: 0,
totalSeeded: 0,
totalProcessable: 0,
totalAttempted: 0,
totalSuccessful: 0,
totalFailed: 0,
summaries: []
};
}
let heartbeatAt = lockUntil;
const runAt = new Date().toISOString();
try {
const where: Prisma.BroadcastCampaignWhereInput = {
OR: [
{ status: CampaignStatus.PROCESSING },
{
status: CampaignStatus.SCHEDULED,
scheduledAt: { not: null, lte: now }
}
],
...(params.tenantId ? { tenantId: params.tenantId } : {}),
NOT: { status: CampaignStatus.CANCELED }
};
if (params.campaignId) {
where.id = params.campaignId;
}
const campaigns = await prisma.broadcastCampaign.findMany({
where,
include: { channel: true, template: true },
orderBy: { createdAt: "asc" },
take: maxCampaigns
});
const summaries: CampaignDispatchResult[] = [];
let totalSeeded = 0;
let totalProcessable = 0;
let totalAttempted = 0;
let totalSuccessful = 0;
let totalFailed = 0;
for (const campaign of campaigns) {
if (!heartbeatAt || heartbeatAt <= new Date()) {
heartbeatAt = await heartbeatCampaignRetryLock(runId, lockTtlSeconds);
}
const summary = await dispatchCampaignById({
campaignId: campaign.id,
tenantId: campaign.tenantId,
actorUserId: params.actorUserId,
actorIpAddress: params.actorIpAddress,
actorUserAgent: params.actorUserAgent,
maxRecipientsPerCampaign: recipientBatchSize,
now,
enforceSchedule: true,
source: "scheduler"
});
summaries.push(summary);
totalSeeded += summary.seededRecipients;
totalProcessable += summary.processableRecipients;
totalAttempted += summary.attempted;
totalSuccessful += summary.successful;
totalFailed += summary.failed;
}
await heartbeatCampaignRetryLock(runId, lockTtlSeconds);
const result: CampaignRetryBatchResult = {
runAt,
status: "completed",
processedCampaigns: campaigns.length,
totalSeeded,
totalProcessable,
totalAttempted,
totalSuccessful,
totalFailed,
summaries
};
await releaseCampaignRetryLock(runId, result);
return result;
} catch (error) {
const message = error instanceof Error ? error.message : "Retry worker failed";
const result: CampaignRetryBatchResult = {
runAt,
status: "failed",
processedCampaigns: 0,
totalSeeded: 0,
totalProcessable: 0,
totalAttempted: 0,
totalSuccessful: 0,
totalFailed: 0,
summaries: [],
error: message
};
await prisma.backgroundJobState.updateMany({
where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId },
data: {
lastRunCompletedAt: new Date(runAt),
lastRunStatus: "failed",
lockedUntil: null,
lastError: message,
lastRunSummaryJson: safeStringify(result),
consecutiveFailures: { increment: 1 },
lastFailureAt: new Date()
}
});
await notifyCampaignRetryFailure({
jobName: CAMPAIGN_RETRY_JOB_NAME,
runId,
error: message,
runAt: runAt,
processedCampaigns: result.processedCampaigns,
totalAttempted: result.totalAttempted,
totalFailed: result.totalFailed
});
throw error;
}
}
export async function getCampaignRetryState() {
return prisma.backgroundJobState.findUnique({
where: { jobName: CAMPAIGN_RETRY_JOB_NAME },
select: {
jobName: true,
lockedUntil: true,
lastRunStartedAt: true,
lastRunCompletedAt: true,
lastRunStatus: true,
consecutiveFailures: true,
lastFailureAt: true,
lastError: true,
runs: true,
lastRunSummaryJson: true
}
});
}