import { CampaignAudienceType, CampaignStatus, DeliveryStatus, OptInStatus, type Prisma } from "@prisma/client"; import { DEFAULT_MAX_SEND_ATTEMPTS, getRetryDelaySeconds, recalculateCampaignTotals, renderCampaignMessage } from "@/lib/campaign-utils"; import { writeAuditTrail } from "@/lib/audit"; import { prisma } from "@/lib/prisma"; import { sendOutboundTextMessage } from "@/lib/whatsapp-provider"; import { notifyCampaignRetryFailure } from "@/lib/job-alerts"; const DEFAULT_RETRIES_PER_RUN = 100; const DEFAULT_CAMPAIGNS_PER_RUN = 20; const DEFAULT_LOCK_TTL_SECONDS = 300; const CAMPAIGN_RETRY_JOB_NAME = "campaign-retry-worker"; type CampaignAudienceContact = { id: string; fullName: string; phoneNumber: string; }; type CampaignRecipientWithContact = Prisma.CampaignRecipientGetPayload<{ include: { contact: { select: { id: true; fullName: true; phoneNumber: true } } }; }>; type CampaignDispatchCampaignSeed = Prisma.BroadcastCampaignGetPayload<{ include: { channel: true; template: true } }>; type CampaignDispatchResult = { campaignId: string; campaignName: string; skippedReason: string | null; seededRecipients: number; processableRecipients: number; attempted: number; successful: number; failed: number; idle: boolean; }; type CampaignRetryBatchOptions = { tenantId?: string; campaignId?: string; actorUserId?: string | null; actorIpAddress?: string | null; actorUserAgent?: string | null; now?: Date; recipientBatchSize?: number; maxCampaigns?: number; }; type CampaignRetryBatchResult = { runAt: string; status: "completed" | "skipped_locked" | "failed"; processedCampaigns: number; totalSeeded: number; totalProcessable: number; totalAttempted: number; totalSuccessful: number; totalFailed: number; summaries: CampaignDispatchResult[]; error?: string; }; export async function loadCampaignAudienceContacts(tenantId: string, campaign: { audienceType: CampaignAudienceType; segmentId: string | null; }) { if (campaign.audienceType === CampaignAudienceType.SEGMENT && campaign.segmentId) { const segmentMembers = await prisma.segmentMember.findMany({ where: { segmentId: campaign.segmentId, tenantId }, include: { contact: { select: { id: true, fullName: true, phoneNumber: true } } } }); return segmentMembers .filter((member) => member.contact.phoneNumber.trim().length > 0) .map((member) => ({ id: member.contact.id, fullName: member.contact.fullName, phoneNumber: member.contact.phoneNumber })) as CampaignAudienceContact[]; } const contacts = await prisma.contact.findMany({ where: { tenantId, optInStatus: { not: OptInStatus.OPTED_OUT } }, orderBy: { fullName: "asc" }, select: { id: true, fullName: true, phoneNumber: true } }); return contacts .filter((contact) => contact.phoneNumber.trim().length > 0) .map((contact) => ({ id: contact.id, fullName: contact.fullName, phoneNumber: contact.phoneNumber })); } function isRecipientDueForRetry(recipient: CampaignRecipientWithContact, now: Date) { if (recipient.sendAttempts >= recipient.maxSendAttempts) { return false; } if (!recipient.nextRetryAt) { return true; } return recipient.nextRetryAt <= now; } function mapRecipientTemplateValues(contact: CampaignAudienceContact) { return { "1": contact.fullName, "2": contact.phoneNumber }; } function getCampaignBackoffDate(now: Date, attempt: number) { const delayInSeconds = getRetryDelaySeconds(attempt); return new Date(now.getTime() + delayInSeconds * 1000); } function getBatchSize(raw: number | undefined, fallback: number) { if (!raw || raw <= 0) { return fallback; } if (!Number.isInteger(raw)) { return fallback; } return raw; } function getPositiveIntFromEnv(key: string, fallback: number) { const raw = process.env[key]?.trim(); const value = raw ? Number(raw) : fallback; if (!Number.isInteger(value) || value <= 0) { return fallback; } return value; } function safeStringify(value: unknown) { try { return JSON.stringify(value); } catch { return null; } } async function upsertCampaignRecipientsSeed(campaign: CampaignDispatchCampaignSeed, tenantId: string) { const audienceContacts = await loadCampaignAudienceContacts(tenantId, { audienceType: campaign.audienceType, segmentId: campaign.segmentId }); const uniqueContacts = new Map(); for (const item of audienceContacts) { if (!uniqueContacts.has(item.id) && item.phoneNumber) { uniqueContacts.set(item.id, item); } } return { uniqueContacts, payload: Array.from(uniqueContacts.values()).map((contact) => ({ tenantId, campaignId: campaign.id, contactId: contact.id, phoneNumber: contact.phoneNumber, sendStatus: DeliveryStatus.QUEUED, sendAttempts: 0, maxSendAttempts: DEFAULT_MAX_SEND_ATTEMPTS })) }; } export async function dispatchCampaignById(params: { campaignId: string; tenantId: string; actorUserId?: string | null; actorIpAddress?: string | null; actorUserAgent?: string | null; maxRecipientsPerCampaign?: number; now?: Date; enforceSchedule?: boolean; source?: "manual" | "scheduler"; }): Promise { const now = params.now ?? new Date(); const limit = getBatchSize(params.maxRecipientsPerCampaign, DEFAULT_RETRIES_PER_RUN); const campaign = await prisma.broadcastCampaign.findFirst({ where: { id: params.campaignId, tenantId: params.tenantId }, include: { channel: true, template: true } }); if (!campaign) { return { campaignId: params.campaignId, campaignName: "-", skippedReason: "campaign_not_found", seededRecipients: 0, processableRecipients: 0, attempted: 0, successful: 0, failed: 0, idle: true }; } if (campaign.status === CampaignStatus.CANCELED) { return { campaignId: campaign.id, campaignName: campaign.name, skippedReason: "campaign_not_ready", seededRecipients: 0, processableRecipients: 0, attempted: 0, successful: 0, failed: 0, idle: true }; } if (params.enforceSchedule && campaign.scheduledAt && campaign.scheduledAt > now) { return { campaignId: campaign.id, campaignName: campaign.name, skippedReason: "scheduled_not_ready", seededRecipients: 0, processableRecipients: 0, attempted: 0, successful: 0, failed: 0, idle: true }; } const campaignStatusSeed = campaign.scheduledAt && campaign.scheduledAt <= now ? CampaignStatus.PROCESSING : campaign.status; if (campaignStatusSeed !== campaign.status) { await prisma.broadcastCampaign.update({ where: { id: campaign.id }, data: { status: campaignStatusSeed, startedAt: now } }); campaign.status = campaignStatusSeed; } const existingRecipientCount = await prisma.campaignRecipient.count({ where: { campaignId: campaign.id } }); const auditContext = { tenantId: params.tenantId, actorUserId: params.actorUserId, ipAddress: params.actorIpAddress, userAgent: params.actorUserAgent }; let seededRecipients = 0; if (existingRecipientCount === 0) { const { uniqueContacts, payload } = await upsertCampaignRecipientsSeed(campaign, params.tenantId); if (payload.length === 0) { return { campaignId: campaign.id, campaignName: campaign.name, skippedReason: "no_recipients", seededRecipients: 0, processableRecipients: 0, attempted: 0, successful: 0, failed: 0, idle: true }; } await prisma.$transaction([ prisma.campaignRecipient.createMany({ data: payload }), prisma.broadcastCampaign.update({ where: { id: campaign.id }, data: { totalRecipients: payload.length, totalSent: 0, totalDelivered: 0, totalRead: 0, totalFailed: 0, status: campaignStatusSeed, startedAt: now } }) ]); await writeAuditTrail({ tenantId: params.tenantId, actorUserId: params.actorUserId, entityType: "campaign", entityId: campaign.id, action: "campaign_recipients_seeded", metadata: { totalRecipients: payload.length, audienceType: campaign.audienceType, uniqueContactCount: uniqueContacts.size }, ipAddress: params.actorIpAddress, userAgent: params.actorUserAgent }); seededRecipients = payload.length; } const retryCandidates = await prisma.campaignRecipient.findMany({ where: { campaignId: campaign.id, sendStatus: { in: [DeliveryStatus.QUEUED, DeliveryStatus.FAILED] }, OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: now } }] }, include: { contact: { select: { id: true, fullName: true, phoneNumber: true } } }, orderBy: { createdAt: "asc" }, take: limit }); const processableRecipients = retryCandidates.filter((recipient) => isRecipientDueForRetry(recipient, now)); if (processableRecipients.length === 0) { await recalculateCampaignTotals(campaign.id); return { campaignId: campaign.id, campaignName: campaign.name, skippedReason: null, seededRecipients, processableRecipients: 0, attempted: 0, successful: 0, failed: 0, idle: true }; } let success = 0; let failed = 0; await writeAuditTrail({ ...auditContext, entityType: "campaign", entityId: campaign.id, action: "campaign_dispatch_started", metadata: { source: params.source ?? "manual", campaignId: campaign.id, candidateCount: processableRecipients.length } }); for (const recipient of processableRecipients) { const attemptAt = new Date(); let outboundFailureReason: string | null = null; let outboundProvider = "unknown"; let providerMessageId: string | null = recipient.providerMessageId; let isSuccess = false; try { const renderedBody = renderCampaignMessage( campaign.template.bodyText, mapRecipientTemplateValues({ id: recipient.contact.id, fullName: recipient.contact.fullName, phoneNumber: recipient.contact.phoneNumber }) ); const outbound = await sendOutboundTextMessage({ tenantId: params.tenantId, channelId: campaign.channel.id, channelProvider: campaign.channel.provider, phoneNumberId: campaign.channel.phoneNumberId, to: recipient.phoneNumber, content: renderedBody, messageId: recipient.id }); outboundFailureReason = outbound.failureReason ?? null; outboundProvider = outbound.provider; providerMessageId = outbound.providerMessageId ?? providerMessageId; isSuccess = outbound.success; } catch (error) { const reason = error instanceof Error ? error.message : "Unknown send error"; outboundFailureReason = reason; isSuccess = false; } const attempt = recipient.sendAttempts + 1; const hasRetry = attempt < recipient.maxSendAttempts && !isSuccess; const nextRetryAt = hasRetry ? getCampaignBackoffDate(attemptAt, attempt) : null; await prisma.campaignRecipient.update({ where: { id: recipient.id }, data: { sendAttempts: attempt, lastAttemptAt: attemptAt, failureReason: outboundFailureReason, providerMessageId, sendStatus: isSuccess ? DeliveryStatus.SENT : hasRetry ? DeliveryStatus.QUEUED : DeliveryStatus.FAILED, sentAt: isSuccess && !recipient.sentAt ? attemptAt : recipient.sentAt, nextRetryAt } }); await writeAuditTrail({ ...auditContext, entityType: "campaign_recipient", entityId: recipient.id, action: isSuccess ? "campaign_recipient_send_success" : "campaign_recipient_send_failed", metadata: { campaignId: campaign.id, contactId: recipient.contactId, attempt, maxAttempts: recipient.maxSendAttempts, provider: outboundProvider, providerMessageId, failureReason: outboundFailureReason ?? null, retryAfter: nextRetryAt ? nextRetryAt.toISOString() : null, source: params.source ?? "manual" }, ipAddress: params.actorIpAddress, userAgent: params.actorUserAgent }); if (isSuccess) { success += 1; } else { failed += 1; } } await recalculateCampaignTotals(campaign.id); await writeAuditTrail({ ...auditContext, entityType: "campaign", entityId: campaign.id, action: "campaign_dispatch_completed", metadata: { source: params.source ?? "manual", candidateCount: processableRecipients.length, success, failed, allSuccessful: failed === 0 } }); return { campaignId: campaign.id, campaignName: campaign.name, skippedReason: null, seededRecipients, processableRecipients: processableRecipients.length, attempted: processableRecipients.length, successful: success, failed, idle: false }; } async function acquireCampaignRetryLock(runId: string, ttlSeconds: number) { const now = new Date(); const lockUntil = new Date(now.getTime() + ttlSeconds * 1000); try { await prisma.backgroundJobState.create({ data: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId, lockedUntil: lockUntil, runs: 1, lastRunStartedAt: now, lastRunStatus: "running" } }); return lockUntil; } catch { const updated = await prisma.backgroundJobState.updateMany({ where: { jobName: CAMPAIGN_RETRY_JOB_NAME, OR: [{ lockedUntil: null }, { lockedUntil: { lte: now } }] }, data: { lockedBy: runId, lockedUntil: lockUntil, lastRunStartedAt: now, lastRunStatus: "running", lastRunCompletedAt: null, lastError: null, runs: { increment: 1 } } }); if (updated.count !== 1) { return null; } return lockUntil; } } async function releaseCampaignRetryLock(runId: string, summary: CampaignRetryBatchResult) { await prisma.backgroundJobState.updateMany({ where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId }, data: { lockedUntil: null, lastRunCompletedAt: new Date(summary.runAt), lastRunStatus: summary.status, lastRunSummaryJson: safeStringify(summary), consecutiveFailures: 0, lastFailureAt: null } }); } async function heartbeatCampaignRetryLock(runId: string, lockTtlSeconds: number) { const lockUntil = new Date(Date.now() + lockTtlSeconds * 1000); await prisma.backgroundJobState.updateMany({ where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId }, data: { lockedUntil: lockUntil } }); return lockUntil; } export async function runCampaignRetryBatch(params: CampaignRetryBatchOptions): Promise { const now = params.now ?? new Date(); const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; const recipientBatchSize = getBatchSize(params.recipientBatchSize, getPositiveIntFromEnv("CAMPAIGN_RETRY_BATCH_SIZE", DEFAULT_RETRIES_PER_RUN)); const maxCampaigns = getBatchSize(params.maxCampaigns, getPositiveIntFromEnv("CAMPAIGN_RETRY_MAX_CAMPAIGNS", DEFAULT_CAMPAIGNS_PER_RUN)); const lockTtlSeconds = getPositiveIntFromEnv("CAMPAIGN_RETRY_JOB_LOCK_TTL_SECONDS", DEFAULT_LOCK_TTL_SECONDS); const lockUntil = await acquireCampaignRetryLock(runId, lockTtlSeconds); if (!lockUntil) { return { runAt: new Date().toISOString(), status: "skipped_locked", processedCampaigns: 0, totalSeeded: 0, totalProcessable: 0, totalAttempted: 0, totalSuccessful: 0, totalFailed: 0, summaries: [] }; } let heartbeatAt = lockUntil; const runAt = new Date().toISOString(); try { const where: Prisma.BroadcastCampaignWhereInput = { OR: [ { status: CampaignStatus.PROCESSING }, { status: CampaignStatus.SCHEDULED, scheduledAt: { not: null, lte: now } } ], ...(params.tenantId ? { tenantId: params.tenantId } : {}), NOT: { status: CampaignStatus.CANCELED } }; if (params.campaignId) { where.id = params.campaignId; } const campaigns = await prisma.broadcastCampaign.findMany({ where, include: { channel: true, template: true }, orderBy: { createdAt: "asc" }, take: maxCampaigns }); const summaries: CampaignDispatchResult[] = []; let totalSeeded = 0; let totalProcessable = 0; let totalAttempted = 0; let totalSuccessful = 0; let totalFailed = 0; for (const campaign of campaigns) { if (!heartbeatAt || heartbeatAt <= new Date()) { heartbeatAt = await heartbeatCampaignRetryLock(runId, lockTtlSeconds); } const summary = await dispatchCampaignById({ campaignId: campaign.id, tenantId: campaign.tenantId, actorUserId: params.actorUserId, actorIpAddress: params.actorIpAddress, actorUserAgent: params.actorUserAgent, maxRecipientsPerCampaign: recipientBatchSize, now, enforceSchedule: true, source: "scheduler" }); summaries.push(summary); totalSeeded += summary.seededRecipients; totalProcessable += summary.processableRecipients; totalAttempted += summary.attempted; totalSuccessful += summary.successful; totalFailed += summary.failed; } await heartbeatCampaignRetryLock(runId, lockTtlSeconds); const result: CampaignRetryBatchResult = { runAt, status: "completed", processedCampaigns: campaigns.length, totalSeeded, totalProcessable, totalAttempted, totalSuccessful, totalFailed, summaries }; await releaseCampaignRetryLock(runId, result); return result; } catch (error) { const message = error instanceof Error ? error.message : "Retry worker failed"; const result: CampaignRetryBatchResult = { runAt, status: "failed", processedCampaigns: 0, totalSeeded: 0, totalProcessable: 0, totalAttempted: 0, totalSuccessful: 0, totalFailed: 0, summaries: [], error: message }; await prisma.backgroundJobState.updateMany({ where: { jobName: CAMPAIGN_RETRY_JOB_NAME, lockedBy: runId }, data: { lastRunCompletedAt: new Date(runAt), lastRunStatus: "failed", lockedUntil: null, lastError: message, lastRunSummaryJson: safeStringify(result), consecutiveFailures: { increment: 1 }, lastFailureAt: new Date() } }); await notifyCampaignRetryFailure({ jobName: CAMPAIGN_RETRY_JOB_NAME, runId, error: message, runAt: runAt, processedCampaigns: result.processedCampaigns, totalAttempted: result.totalAttempted, totalFailed: result.totalFailed }); throw error; } } export async function getCampaignRetryState() { return prisma.backgroundJobState.findUnique({ where: { jobName: CAMPAIGN_RETRY_JOB_NAME }, select: { jobName: true, lockedUntil: true, lastRunStartedAt: true, lastRunCompletedAt: true, lastRunStatus: true, consecutiveFailures: true, lastFailureAt: true, lastError: true, runs: true, lastRunSummaryJson: true } }); }