A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building a lead intake pipeline that intercepts web form, chat, or email submissions, classifies them with Mistral AI, scores the intent confidence through a threshold-based router, repairs any malformed LLM output, pushes the prioritized contact to ActiveCampaign, and traces the entire flow with Langfuse telemetry — all while enforcing a per-lead LLM spend budget.
You’ll use the @reaatech/* package family (confidence-router, agent-budget-engine, structured-repair-core, llm-cost-telemetry) alongside @mistralai/mistralai, activecampaign, langfuse, and zod inside a Next.js 16 App Router project.
Prerequisites
Node.js >= 22 and pnpm installed on your machine
A Mistral AI API key
An ActiveCampaign account with API access (API URL + API key)
A Langfuse account (public + secret keys)
Familiarity with TypeScript and Next.js App Router basics
Step 1: Scaffold the project
Create a new Next.js 16 project with the App Router and install all dependencies at exact pinned versions.
For testing, add msw (Mock Service Worker) as a dev dependency:
terminal
pnpm add -D msw@2.14.6
Expected output: A pnpm-lock.yaml file is generated and all pnpm add commands exit cleanly. Your package.jsondependencies block now lists every package at an exact semver version with no ^ or ~ prefixes.
Step 2: Configure environment variables
Open .env.example and replace its contents with the environment variables the pipeline needs. This file keeps placeholder values — never commit real secrets.
Copy this to .env.local and fill in your actual credentials:
terminal
cp .env.example .env.local
Expected output:.env.local exists with your real API keys. The LEAD_BUDGET_LIMIT of 0.05 means each lead’s LLM call is capped at $0.05 USD. The LEAD_BUDGET_SOFT_CAP of 0.8 means when 80% of the budget is consumed, the system auto-downgrades from mistral-large-latest to mistral-small-latest.
Step 3: Define types and schemas with Zod
Create the types module that defines the data shapes flowing through the pipeline. Every input, classification result, and output gets a Zod schema for runtime validation.
ts
// src/services/lead-types.tsimport { z } from "zod";import { type CostSpan, type TelemetryContext } from "@reaatech/llm-cost-telemetry";export const LeadSubmissionSchema = z.object({ name: z.string().min(1), email: z.email(), company: z.string().optional(), message: z.string().min(1), source: z.enum(["web_form", "chat", "email"]),});export const LeadClassificationSchema = z.object({ intent: z.enum(["high", "medium", "low"]), urgency: z.enum(["urgent", "normal"]), score: z.number().min(0).max(100), category: z.string().min(1), summary: z.string().min(1),});export const LeadIntakeResultSchema = z.object({ contactId: z.string(), status: z.enum(["created", "updated", "skipped"]), score: z.number(), tags: z.array(z.string()), llmCostUsd: z.number(), processingTimeMs: z.number(),});export type LeadSubmission = z.infer<typeof LeadSubmissionSchema>;export type LeadClassification = z.infer<typeof LeadClassificationSchema>;export type LeadIntakeResult = z.infer<typeof LeadIntakeResultSchema>;export { type CostSpan, type TelemetryContext };
Expected output: Three Zod schemas and their inferred TypeScript types. LeadSubmissionSchema validates incoming form payloads, LeadClassificationSchema validates the LLM classification output, and LeadIntakeResultSchema defines the shape returned to the API caller. Note the z.email() validator on the email field — Zod 4 ships built-in email validation.
Step 4: Create the configuration module
This module reads environment variables and provides typed configuration to the rest of the application. Three variables are mandatory: MISTRAL_API_KEY, ACTIVECAMPAIGN_API_URL, and ACTIVECAMPAIGN_API_KEY. The rest have sensible defaults.
ts
// src/lib/config.tsexport interface AppConfig { mistralApiKey: string; activeCampaignApiUrl: string; activeCampaignApiKey: string; langfusePublicKey: string; langfuseSecretKey: string; langfuseHost: string; leadBudgetLimit: number; leadBudgetSoftCap: number;}export function getConfig(): AppConfig { const mistralApiKey = process.env.MISTRAL_API_KEY; if (!mistralApiKey) throw new Error("MISSING_ENV: MISTRAL_API_KEY is required"); const activeCampaignApiUrl = process.env.ACTIVECAMPAIGN_API_URL; if (!activeCampaignApiUrl) throw new Error("MISSING_ENV: ACTIVECAMPAIGN_API_URL is required"); const activeCampaignApiKey = process.env.ACTIVECAMPAIGN_API_KEY; if (!activeCampaignApiKey) throw new Error("MISSING_ENV: ACTIVECAMPAIGN_API_KEY is required"); return { mistralApiKey, activeCampaignApiUrl, activeCampaignApiKey, langfusePublicKey: process.env.LANGFUSE_PUBLIC_KEY ?? "", langfuseSecretKey: process.env.LANGFUSE_SECRET_KEY ?? "", langfuseHost: process.env.LANGFUSE_HOST ?? "https://cloud.langfuse.com", leadBudgetLimit: Number(process.env.LEAD_BUDGET_LIMIT) || 0.05, leadBudgetSoftCap: Number(process.env.LEAD_BUDGET_SOFT_CAP) || 0.8, };}
Expected output: A getConfig() function that throws a clear MISSING_ENV error if any required env var is missing. Optional vars fall back to sensible defaults. The || operator on Number() coerces both empty strings and NaN to the default.
Step 5: Implement the budget controller
The budget controller enforces a per-lead spend cap using @reaatech/agent-budget-engine. It wraps a SpendStore with a BudgetController, defines a budget with soft/hard caps and an auto-downgrade policy, and exposes functions to check and record spend.
ts
// src/lib/budget.tsimport { BudgetController } from "@reaatech/agent-budget-engine";import { SpendStore } from "@reaatech/agent-budget-spend-tracker";import { BudgetScope } from "@reaatech/agent-budget-types";import { generateId, now, type CostSpan } from "@reaatech/llm-cost-telemetry";export { BudgetController, type CostSpan };export function createBudgetController(limit: number, softCap: number): BudgetController { const store = new SpendStore(); const controller = new BudgetController({ spendTracker: store }); controller.defineBudget({ scopeType: BudgetScope.Task, scopeKey: "per-lead", limit, policy: { softCap, hardCap: 1.0, autoDowngrade: [{ from: ["mistral-large-latest"], to: "mistral-small-latest" }], }, }); controller.on("threshold-breach", (event) => { console.warn(`Budget threshold breached: ${String(event.threshold * 100)}% used for lead scoring`); }); controller.on("hard-stop", (event) => { console.error(`Budget hard stop: ${event.scopeType}:${event.scopeKey} exhausted`); }); return controller;}export function checkLeadBudget( controller: BudgetController, estimatedCost: number, modelId: string,): { allowed: boolean; suggestedModel?: string } { const result = controller.check({ scopeType: BudgetScope.Task, scopeKey: "per-lead", estimatedCost, modelId, tools: [], }); return { allowed: result.allowed, suggestedModel: result.suggestedModel };}export function recordLeadSpend( controller: BudgetController, cost: number, modelId: string, inputTokens: number, outputTokens: number,): void { controller.record({ requestId: generateId(), scopeType: BudgetScope.Task, scopeKey: "per-lead", cost, inputTokens, outputTokens, modelId, provider: "mistral", timestamp: now(), });}
Expected output: Three exports — createBudgetController sets up the budget with a 0.05 limit and 0.8 soft cap, checkLeadBudget queries the controller before making an LLM call (returning the downgraded model if the soft cap is hit), and recordLeadSpend logs the actual token usage and cost after the call. Events fire console.warn at 80% and console.error at 100% spend.
Step 6: Set up Langfuse telemetry
Langfuse provides observability for every lead processed. This module wraps Langfuse SDK calls into a reusable helper that creates a trace per lead intake and records classification and ActiveCampaign spans.
Expected output:traceLeadIntake wraps any pipeline function in a Langfuse trace named "lead-intake" with the lead’s email and source attached as metadata. The finally block guarantees flushAsync() is called even if the pipeline throws, so no telemetry data is lost.
Step 7: Build the Mistral lead classifier
The classifier sends a lead submission to Mistral AI’s chat completions endpoint, asks it to classify the lead by intent, urgency, score, category, and summary, then passes the raw output through @reaatech/structured-repair-core’s repairOutput function. This catches malformed JSON, markdown-wrapped responses, truncated output, and mismatched keys — common LLM output ailments.
ts
// src/services/lead-classifier.tsimport { Mistral } from "@mistralai/mistralai";import { repairOutput } from "@reaatech/structured-repair-core";import { type LeadSubmission, type LeadClassification, LeadClassificationSchema } from "./lead-types.js";import { getConfig } from "../lib/config.js";export class ClassificationError extends Error { constructor( message: string, public readonly rawOutput: string, public readonly cause?: unknown, ) { super(message); this.name = "ClassificationError"; }}export async function classifyLead( submission: LeadSubmission, model?: string,): Promise<{ classification: LeadClassification; usage?: { inputTokens: number; outputTokens: number } }> { const config = getConfig(); const mistral = new Mistral({ apiKey: config.mistralApiKey }); const systemPrompt = "Classify the following lead submission by intent (high/medium/low), urgency (urgent/normal), overall score (0-100), category, and summary. Return ONLY valid JSON."; const userMessage = JSON.stringify({ name: submission.name, email: submission.email, company: submission.company ?? undefined, message: submission.message, source: submission.source, }); const result = await mistral.chat.complete({ model: model ?? "mistral-large-latest", messages: [ { role: "system", content: systemPrompt }, { role: "user", content: userMessage }, ], }); const rawContentRaw = result.choices[0]?.message?.content; const rawContent = typeof rawContentRaw === "string" ? rawContentRaw : ""; const repaired = repairOutput({ schema: LeadClassificationSchema, input: rawContent, }); if (!repaired.success) { throw new ClassificationError( "Failed to parse Mistral classification output", rawContent, repaired.errors, ); } return { classification: repaired.data as LeadClassification, usage: { inputTokens: result.usage.promptTokens as number, outputTokens: result.usage.completionTokens as number, }, };}
Expected output: When the Mistral API returns valid JSON like {"intent":"high","urgency":"urgent","score":85,"category":"enterprise-sales","summary":"Hot lead"}, classifyLead returns a parsed LeadClassification plus token counts. If the response is prose, wrapped in markdown fences, or has hallucinated fields, repairOutput applies six graduated repair strategies automatically. If all strategies fail, a ClassificationError is thrown with the raw output attached for debugging.
Step 8: Build the confidence-based lead scorer
The scorer uses @reaatech/confidence-router to map a classification score (0-100) to a routing decision. This determines whether the lead is high-priority (routed), medium-priority (needs clarification), or low-priority (fallback).
Expected output: A score of 85 (confidence 0.85) triggers ROUTE and returns { priority: "high", label: "high" }. A score of 20 (confidence 0.20) falls below the fallbackThreshold of 0.3 and triggers FALLBACK — { priority: "low", label: "low" }. A score of 50 (confidence 0.50) sits between the two thresholds and triggers CLARIFY — { priority: "medium", label: "medium" }.
Step 9: Build the ActiveCampaign service
This service handles all communication with ActiveCampaign’s API. It finds or creates contacts, sets custom field values for the lead score, and adds priority and category tags.
ts
// src/services/activecampaign-service.tsimport ActiveCampaign from "activecampaign";import { getConfig } from "../lib/config.js";export type { ActiveCampaign };export const SCORE_FIELD_ID = "1";export const PRIORITY_TAG_MAP: Record<string, string> = { high: "1", medium: "2", low: "3",};export class ActiveCampaignError extends Error { constructor( message: string,
Expected output: The pushLeadToActiveCampaign function idempotently upserts a contact: it looks up by email first, creates if missing, then sets the score field, adds a priority tag ("1" for high, "2" for medium, "3" for low), and adds the classification category as a second tag. Every ac.api() call is wrapped in try/catch and throws a typed ActiveCampaignError on failure.
You also need type declarations for the activecampaign package, which has minimal TypeScript support. Create this file:
The orchestrator ties everything together. It lazy-initializes the budget controller, Langfuse client, confidence router, and ActiveCampaign client at module scope (so they’re reused across requests), then runs the full pipeline inside a Langfuse trace.
ts
// src/services/lead-intake-service.tsimport { calculateCostFromTokens } from "@reaatech/llm-cost-telemetry";import { classifyLead, ClassificationError } from "./lead-classifier.js";import { scoreLead, createConfidenceRouter, type ConfidenceRouter } from "./lead-scorer.js";import { pushLeadToActiveCampaign, createActiveCampaignClient, ActiveCampaignError, type ActiveCampaign,} from "./activecampaign-service.js";import { type BudgetController, createBudgetController, checkLeadBudget, recordLeadSpend } from "../lib/budget.js";import { createLangfuseClient, traceLeadIntake, createClassificationSpan, createActiveCampaignSpan, type Langfuse } from "../lib/telemetry.js";import { type LeadSubmission, type
Expected output:processLead runs the full pipeline: budget check → Mistral classification → structured repair → confidence scoring → ActiveCampaign push → Langfuse telemetry. Classification errors and ActiveCampaign errors are each caught separately with typed instanceof checks, returning a graceful LeadIntakeResult with status: "skipped" rather than throwing an unhandled exception. Unknown errors also return a skipped result, ensuring the pipeline never crashes.
Step 11: Create the API route
The API route at POST /api/lead-intake receives lead submissions, validates them with Zod, and delegates to the orchestrator. It uses NextRequest and NextResponse (not bare Request/Response) to follow Next.js 16 conventions.
Expected output: The route accepts POST requests with a JSON body matching LeadSubmissionSchema. A valid submission returns 200 with the lead intake result. Invalid bodies get 422 with Zod error details. Classification failures return 502. Unknown errors return 500. Only the named POST export is used — no default export.
Step 12: Run the tests
The test suite validates every layer: schemas, config, budget enforcement, lead classification (with MSW-mocked Mistral API), confidence scoring, ActiveCampaign service, full pipeline integration, and the API route.
Run the full test suite:
terminal
pnpm test
Expected output: All 59 tests pass across 8 test files. Your terminal should show output similar to:
The pnpm test script also runs coverage with 90% thresholds on lines, branches, functions, and statements. Coverage is reported in the ./coverage/ directory and as vitest-report.json.
Finally, run the type checker and linter:
terminal
pnpm typecheckpnpm lint
Expected output: Both exit with code 0 and no errors.
Next steps
Add multi-channel intake — Extend the pipeline to accept webhook payloads from Slack, HubSpot, or Zapier by normalizing each source to the LeadSubmission shape.
Deploy as a serverless function — Configure the Next.js app for Vercel or AWS Lambda. Add src/instrumentation.ts with experimental.instrumentationHook: true for warm-start optimizations.
Build a lead dashboard — Create a Next.js page at app/leads/page.tsx that queries ActiveCampaign via their GET /api/3/contacts endpoint and displays scored leads sorted by priority.
Add human-in-the-loop clarification — When the confidence router returns CLARIFY (medium priority), forward the lead to a Slack webhook for manual review instead of auto-creating the contact.
public
readonly
operation
:
string
,
public readonly cause?: unknown,
) {
super(message);
this.name = "ActiveCampaignError";
}
}
export function createActiveCampaignClient(): ActiveCampaign {
const config = getConfig();
return new ActiveCampaign(config.activeCampaignApiUrl, config.activeCampaignApiKey);
}
export async function findContactByEmail(
ac: ActiveCampaign,
email: string,
): Promise<{ id: string } | null> {
try {
const result = await ac.api("contact/view?email=" + encodeURIComponent(email), {});
if (result.success && result.contact?.id) {
return { id: String(result.contact.id) };
}
return null;
} catch {
return null;
}
}
export async function createContact(
ac: ActiveCampaign,
name: string,
email: string,
company?: string,
): Promise<{ id: string }> {
const nameParts = name.split(" ");
const firstName = nameParts[0] ?? "";
const lastName = nameParts.slice(1).join(" ");
try {
const result = await ac.api("contact/add", {
email,
firstName,
lastName,
company: company ?? "",
});
if (!result.success) {
throw new ActiveCampaignError("Failed to create contact", "contact/add", result);