Files
BizOne-portal/backend/src/webhooks/webhooks.service.ts

393 lines
12 KiB
TypeScript

import { Prisma } from '@prisma/client';
import {
BadRequestException,
Injectable,
NotFoundException,
UnauthorizedException,
} from '@nestjs/common';
import { getAppConfig } from '../config/env';
import { AuthenticatedUser } from '../auth/auth.types';
import { ConversationsService } from '../conversations/conversations.service';
import { JobsService } from '../jobs/jobs.service';
import { defaultWhatsappSubscriptions } from '../integrations/whatsapp-subscriptions';
import { normalizePhoneNumber } from '../common/normalize';
import { PrismaService } from '../prisma/prisma.service';
import {
normalizeWebhookPayload,
verifyMetaSignature,
} from './webhooks.utils';
import type {
NormalizedWebhookEvent,
WebhookHeaders,
WebhookReceiveSummary,
} from './webhooks.types';
@Injectable()
export class WebhooksService {
constructor(
private readonly prisma: PrismaService,
private readonly jobsService: JobsService,
private readonly conversationsService: ConversationsService,
) {}
async verifyChallenge(mode?: string, token?: string, challenge?: string) {
const config = await this.getEffectiveWhatsappConfig();
if (mode !== 'subscribe') {
throw new BadRequestException('Unsupported webhook verification mode');
}
if ((token || '') !== config.webhookVerifyToken) {
throw new UnauthorizedException('Invalid webhook verify token');
}
return challenge || '';
}
async receive(
provider: string,
payload: unknown,
headers: WebhookHeaders,
rawBody?: Buffer,
): Promise<WebhookReceiveSummary> {
const verification = await this.verifyWebhookRequest(provider, rawBody, headers);
const normalizedEvents = normalizeWebhookPayload(provider, payload);
let queued = 0;
let duplicates = 0;
let ignored = 0;
const queuedEventIds: string[] = [];
const config = await this.getEffectiveWhatsappConfig();
for (const event of normalizedEvents) {
const outcome = await this.persistNormalizedEvent(
event,
verification.verified,
config.subscriptions.includes(event.eventType),
);
if (outcome === 'queued') {
queued += 1;
queuedEventIds.push(event.eventId);
} else if (outcome === 'duplicate') {
duplicates += 1;
} else {
ignored += 1;
}
}
return {
provider: provider.toLowerCase(),
verified: verification.verified,
verification: verification.reason,
received: normalizedEvents.length,
queued,
duplicates,
ignored,
eventIds: normalizedEvents.map((event) => event.eventId),
};
}
findAll(limit = 50, provider?: string, status?: string) {
return this.prisma.webhookEvent.findMany({
where: {
provider: provider?.trim() || undefined,
processingStatus: status?.trim() || undefined,
},
orderBy: { createdAt: 'desc' },
take: Math.min(Math.max(limit, 1), 100),
});
}
async retryEvent(eventId: string, user?: AuthenticatedUser, ipAddress?: string) {
const event = await this.prisma.webhookEvent.findUnique({
where: { eventId },
});
if (!event) {
throw new NotFoundException('Webhook event not found');
}
await this.prisma.$transaction(async (tx) => {
await tx.webhookEvent.update({
where: { eventId },
data: {
processingStatus: 'queued',
processingNotes: 'Manually requeued from retry endpoint',
},
});
});
await this.jobsService.enqueue({
queueName: 'webhooks',
jobType: 'webhook.process',
payload: { eventId },
maxAttempts: 3,
});
if (user) {
const actor = await this.prisma.user.findUnique({
where: { id: user.sub },
select: { id: true, name: true, email: true },
});
await this.prisma.auditLog.create({
data: {
actorUserId: actor?.id || user.sub,
actorName: actor?.name || user.email,
actorEmail: actor?.email || user.email,
actionType: 'Webhook Retry Queued',
module: 'Webhooks',
ipAddress: ipAddress || null,
severity: 'default',
details: `Manually requeued webhook event ${eventId}.`,
},
});
}
return {
eventId,
status: 'queued',
};
}
async processJob(jobId: string) {
const job = await this.jobsService.findById(jobId);
if (!job) {
return;
}
try {
const payload = job.payloadJson as { eventId?: string };
const eventId = payload?.eventId;
if (!eventId) {
throw new Error('Webhook job payload is missing eventId');
}
await this.processSingleEvent(eventId);
await this.jobsService.complete(jobId);
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown processing error';
if (job.attempts < job.maxAttempts) {
await this.jobsService.retry(jobId, message, 2000 * job.attempts);
} else {
await this.jobsService.fail(jobId, message);
}
}
}
private async persistNormalizedEvent(
event: NormalizedWebhookEvent,
verified: boolean,
isSubscribed: boolean,
) {
try {
await this.prisma.$transaction(async (tx) => {
await tx.webhookEvent.create({
data: {
provider: event.provider,
eventId: event.eventId,
eventType: event.eventType,
senderPhone: event.senderPhone,
recipientPhone: event.recipientPhone,
externalMessageId: event.externalMessageId,
eventTimestamp: event.eventTimestamp,
payloadJson: event.payload as Prisma.InputJsonValue,
verified,
processingStatus: isSubscribed ? 'queued' : 'ignored',
processingNotes: isSubscribed
? 'Queued for webhook worker'
: 'Ignored because event subscription is disabled',
},
});
});
if (!isSubscribed) {
return 'ignored' as const;
}
await this.jobsService.enqueue({
queueName: 'webhooks',
jobType: 'webhook.process',
payload: { eventId: event.eventId },
maxAttempts: 3,
});
return 'queued' as const;
} catch (error) {
if (
error instanceof Prisma.PrismaClientKnownRequestError &&
error.code === 'P2002'
) {
return 'duplicate' as const;
}
throw error;
}
}
private async verifyWebhookRequest(
provider: string,
rawBody: Buffer | undefined,
headers: WebhookHeaders,
) {
const config = await this.getEffectiveWhatsappConfig();
const normalizedProvider = provider.toLowerCase();
const metaSignature = this.readHeader(headers['x-hub-signature-256']);
const genericSecret = this.readHeader(headers['x-webhook-secret']);
if (normalizedProvider === 'meta' && config.appSecret) {
if (!rawBody || !metaSignature) {
throw new UnauthorizedException('Missing meta webhook signature');
}
verifyMetaSignature(rawBody, metaSignature, config.appSecret);
return { verified: true, reason: 'meta-signature' };
}
if (genericSecret) {
if (genericSecret !== config.sharedSecret) {
throw new UnauthorizedException('Invalid webhook shared secret');
}
return { verified: true, reason: 'shared-secret' };
}
if (config.allowUnsigned) {
return { verified: false, reason: 'unsigned-development-request' };
}
throw new UnauthorizedException('Webhook request could not be verified');
}
private async getEffectiveWhatsappConfig() {
const env = getAppConfig();
const stored = await this.prisma.integrationConfig.findUnique({
where: { configKey: 'whatsapp' },
});
const storedJson = (stored?.configJson as Record<string, unknown> | null) ?? {};
return {
webhookVerifyToken:
typeof storedJson.webhookVerifyToken === 'string'
? storedJson.webhookVerifyToken
: env.webhookVerifyToken,
sharedSecret:
typeof storedJson.sharedSecret === 'string'
? storedJson.sharedSecret
: env.webhookSharedSecret,
appSecret:
typeof storedJson.appSecret === 'string'
? storedJson.appSecret
: env.metaWebhookAppSecret,
allowUnsigned: env.webhookAllowUnsigned,
subscriptions:
Array.isArray(storedJson.subscriptions) && storedJson.subscriptions.length > 0
? storedJson.subscriptions.filter((item): item is string => typeof item === 'string')
: defaultWhatsappSubscriptions,
};
}
private readHeader(value: string | string[] | undefined) {
if (Array.isArray(value)) {
return value[0];
}
return value;
}
private async processSingleEvent(eventId: string) {
const event = await this.prisma.webhookEvent.findUnique({
where: { eventId },
});
if (!event) {
throw new Error(`Webhook event ${eventId} not found`);
}
if (event.processingStatus === 'processed') {
return;
}
if (event.eventType === 'message.inbound' && event.senderPhone) {
const normalizedPhone = normalizePhoneNumber(event.senderPhone);
const contact = await this.prisma.contact.upsert({
where: { phoneNumber: normalizedPhone },
update: {},
create: {
name: normalizedPhone,
phoneNumber: normalizedPhone,
notes: `Created from inbound webhook event ${event.eventId}`,
},
});
await this.conversationsService.syncInboundFromWebhookEvent({
webhookEventId: event.eventId,
contactId: contact.id,
externalMessageId: event.externalMessageId,
body: this.extractMessageBody(event.payloadJson),
occurredAt: event.eventTimestamp,
});
}
if (
(event.eventType === 'message.sent' || event.eventType === 'message.delivered' || event.eventType === 'message.read' || event.eventType === 'message.failed')
&& event.externalMessageId
) {
await this.prisma.conversationMessage.updateMany({
where: {
externalMessageId: event.externalMessageId,
},
data: {
status:
event.eventType === 'message.read'
? 'read'
: event.eventType === 'message.delivered'
? 'delivered'
: event.eventType === 'message.failed'
? 'failed'
: 'sent',
...(event.eventType === 'message.read' ? { readAt: event.eventTimestamp } : {}),
},
});
}
await this.prisma.webhookEvent.update({
where: { eventId },
data: {
processingStatus: 'processed',
processingNotes:
event.eventType === 'message.inbound' && event.senderPhone
? 'Processed and synced inbound sender into contacts'
: 'Processed by webhook worker',
},
});
}
private extractMessageBody(payload: unknown) {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) {
return 'Inbound message received.';
}
const record = payload as Record<string, unknown>;
const textRecord =
record.text && typeof record.text === 'object' && !Array.isArray(record.text)
? (record.text as Record<string, unknown>)
: null;
const interactiveRecord =
record.interactive && typeof record.interactive === 'object' && !Array.isArray(record.interactive)
? (record.interactive as Record<string, unknown>)
: null;
return (
[
typeof textRecord?.body === 'string' ? textRecord.body : null,
typeof record.body === 'string' ? record.body : null,
typeof record.caption === 'string' ? record.caption : null,
typeof interactiveRecord?.title === 'string' ? interactiveRecord.title : null,
typeof record.type === 'string' ? `[${record.type}]` : null,
].find((value) => typeof value === 'string' && value.trim()) || 'Inbound message received.'
);
}
}