An AI intake system that processes documents, emails, and chat messages to capture leads and auto-populate HubSpot CRM with accurate, categorized data.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This recipe builds an AI lead intake agent that processes business cards, PDFs, and emails to capture leads and auto-populate HubSpot CRM. It uses Cohere’s NLP models for extraction and classification, routes high-confidence leads straight into HubSpot, and flags low-confidence ones for human review via Slack. Budget enforcement caps monthly API spend per organization so costs stay predictable for SMBs.
Slack bot token (SLACK_TOKEN) and a channel for review notifications (SLACK_VERIFICATION_CHANNEL)
Langfuse keys (LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY) for observability
Basic familiarity with TypeScript and REST APIs
Step 1: Install dependencies and configure environment
Start from the project root. Copy the env example and fill in your keys:
terminal
cp .env.example .env
Your .env needs at minimum:
env
COHERE_API_KEY=HUBSPOT_ACCESS_TOKEN=SLACK_TOKEN=
SLACK_VERIFICATION_CHANNEL=
LANGFUSE_SECRET_KEY=
LANGFUSE_PUBLIC_KEY=
LANGFUSE_HOST=https://cloud.langfuse.com
BUDGET_DEFAULT_MONTHLY_LIMIT=
Install all packages:
terminal
pnpm install
Expected output: pnpm resolves all packages including cohere-ai, @hubspot/api-client, tesseract.js, pdf-parse, langfuse, @slack/web-api, zod, and the @reaatech/* scoped packages.
Step 2: Explore the source layout
The core business logic lives under src/lib/. Each module has a single responsibility:
File
Responsibility
src/lib/types.ts
All TypeScript interfaces and Zod schemas for leads, pipeline results, and extracted fields
src/lib/cohere-client.ts
Singleton Cohere client (CohereClientV2) that reads COHERE_API_KEY automatically
src/lib/document.ts
Document text extraction (PDF via pdf-parse, images via tesseract.js), text chunking, and field extraction via Cohere chat
src/lib/classifier.ts
Intent classification via @reaatech/agent-mesh-classifier + confidence routing via @reaatech/confidence-router
src/lib/notify.ts
Slack notifications for manual review via @slack/web-api
src/lib/hubspot.ts
HubSpot contact/company CRUD via @hubspot/api-client
src/lib/budget.ts
Budget lifecycle management via @reaatech/agent-budget-engine
src/lib/handoff.ts
Agent handoff orchestration (escalation + retry) via @reaatech/agent-handoff
src/lib/langfuse.ts
Observability traces via langfuse
src/lib/store.ts
In-memory lead store during a pipeline run
API routes live under app/api/:
Route
Purpose
app/api/leads/route.ts
POST ingests files or email webhooks; GET is a health check; OPTIONS handles CORS
app/api/leads/[id]/route.ts
GET returns lead status and fields by ID
Step 3: Define types and schemas
Open src/lib/types.ts — this is where every lead shape is declared:
LeadRecordSchema enforces confidence as a number between 0 and 1. ExtractedFieldsSchema requires a valid email and makes everything else optional — extraction may not find every field.
Step 4: Initialize the Cohere client
Create src/lib/cohere-client.ts:
ts
import { CohereClientV2 } from "cohere-ai";export const cohereClient = new CohereClientV2({});
The SDK reads COHERE_API_KEY from the environment automatically. All downstream code imports this singleton rather than creating new clients.
Step 5: Build document extraction and field parsing
Open src/lib/document.ts. This module handles every extraction step from raw file bytes to structured fields:
ts
import { createWorker } from "tesseract.js";import { PDFParse } from "pdf-parse";import { cohereClient } from "./cohere-client.js";import type { Document } from "@reaatech/hybrid-rag";import type { ExtractedFields } from "./types.js";import { ExtractedFieldsSchema } from "./types.js";export async function extractTextFromImage( imageBuffer: Buffer, language?: string,): Promise<string> { const worker = await createWorker(language
The extractLeadsFromFile function chains three steps: detect the MIME type and extract raw text, send that text to Cohere for structured field extraction, and wrap the result in a Document object from @reaatech/hybrid-rag.
Step 6: Classify intent and route by confidence
Create src/lib/classifier.ts. This wires up the agent registry, calls classifierService.classify, and then uses ConfidenceRouter to decide whether to proceed or escalate:
ts
import { classifierService } from "@reaatech/agent-mesh-classifier";import { ConfidenceRouter } from "@reaatech/confidence-router";const leadIntakeRegistry = [ { agent_id: "book_demo", display_name: "Book Demo", description: "Handles requests to schedule a product demo", endpoint: "lead-intake://book_demo", type: "mcp" as const, is_default: false, confidence_threshold: 0.7, clarification_required: false, examples: ["I want to see a demo", "Schedule a demo call"], }, { agent_id: "pricing_inquiry", display_name: "Pricing Inquiry", description: "Handles questions about pricing and plans", endpoint: "lead-intake://pricing_inquiry", type: "mcp" as const, is_default: false, confidence_threshold: 0.7, clarification_required: false, examples: ["How much does it cost?", "What are your pricing plans?"], }, { agent_id: "support_request", display_name: "Support Request", description: "Handles technical support and help requests", endpoint: "lead-intake://support_request", type: "mcp" as const, is_default: false, confidence_threshold: 0.7, clarification_required: false, examples: ["I need help with login", "My account is broken"], }, { agent_id: "general_contact", display_name: "General Contact", description: "Handles general inquiries and messages", endpoint: "lead-intake://general_contact", type: "mcp" as const, is_default: true, confidence_threshold: 0.7, clarification_required: false, examples: ["I want to speak to someone", "Contact me"], },];export async function classifyLeadInput( rawText: string,): Promise<{ agent_id: string; confidence: number; intent_summary: string }> { const output = await classifierService.classify(rawText, leadIntakeRegistry); return { agent_id: output.agent_id, confidence: output.confidence, intent_summary: output.intent_summary, };}const router = new ConfidenceRouter({ routeThreshold: 0.8, fallbackThreshold: 0.3, clarificationEnabled: false,});export function routeClassification( agent_id: string, confidence: number,): { decisionType: string; target: string } { const decision = router.decide({ predictions: [{ label: agent_id, confidence }], }); return { decisionType: decision.type, target: decision.target ?? "", };}export async function classifyAndRoute( rawText: string,): Promise<{ classification: { agent_id: string; confidence: number; intent_summary: string; }; decision: { decisionType: string; target: string };}> { const classification = await classifyLeadInput(rawText); const decision = routeClassification( classification.agent_id, classification.confidence, ); return { classification, decision };}
With clarificationEnabled: false, the router only returns ROUTE (confidence >= 0.8) or FALLBACK (confidence < 0.3). Anything between 0.3 and 0.8 hits neither threshold and falls through — those get sent to Slack for manual review in the route handler.
Step 7: Send Slack notifications
Create src/lib/notify.ts. Notifications must never block the pipeline — errors are caught and logged:
Open src/lib/hubspot.ts. The pushLeadToHubspot function is the end-to-end sync: search for existing contact, update if found, otherwise create and associate the company:
ts
import { Client } from "@hubspot/api-client";import type { LeadRecord } from "./types.js";const hubspotClient = new Client({ accessToken: process.env.HUBSPOT_ACCESS_TOKEN ?? "",});export async function createContact( lead: LeadRecord,): Promise<{ contactId: string; email: string }> { try { const result = await hubspotClient.crm.contacts.basicApi.create({ properties: { firstname: lead.firstName, lastname: lead.lastName,
Step 9: Enforce monthly budgets
Create src/lib/budget.ts. The BudgetController from @reaatech/agent-budget-engine manages the full lifecycle. You wire up an in-memory spend store and a Cohere pricing provider:
ts
import { BudgetController } from "@reaatech/agent-budget-engine";enum BudgetScope { User = "User", Organization = "Organization",}export class InMemorySpendStore { private store = new Map<string, { totalSpent: number; inputTokens: number; outputTokens: number; state: string }>(); record(entry: { scopeType: string; scopeKey: string; cost: number; inputTokens: number; outputTokens:
The hard-stop event listener fires when an org hits its monthly cap — it logs a warning so you can route an alert through your monitoring stack.
Step 10: Orchestrate handoff and escalation
Create src/lib/handoff.ts. The handoff layer emits typed events and wraps HubSpot calls with retry logic:
All trace calls are wrapped in try/catch — Langfuse is observability, not a critical path. If the Langfuse host is unreachable, the pipeline continues.
Step 12: Wire up the lead ingestion API route
Open app/api/leads/route.ts. This is the main entry point for document uploads and email webhooks:
ts
import { NextRequest, NextResponse } from "next/server";import crypto from "node:crypto";import { extractLeadsFromFile, extractLeadFields } from "../../../src/lib/document.js";import { classifyAndRoute } from "../../../src/lib/classifier.js";import { routeToHubspot } from "../../../src/lib/handoff.js";import { sendManualReviewNotification } from "../../../src/lib/notify.js";import { initializeOrgBudget, checkBudget, recordSpend } from "../../../src/lib/budget.js";import { initLangfuse, traceError } from "../../../src/lib/langfuse.js";import { saveLead } from "../../../src/lib/store.js";import type { LeadRecord } from "../../../src/lib/types.js";import
The handler accepts two input shapes: a multipart form with a file field (for PDFs and images) or a JSON body with email and body (for email webhooks). Both paths produce a LeadRecord and classify the raw text before deciding to route or escalate.
Params are async in this Next.js version — wrap them with await before destructuring. The route returns the lead’s current status and HubSpot contact ID, or a 404 if the lead has already fallen out of the in-memory store.
Step 14: Set up instrumentation for Node.js startup
Create src/instrumentation.ts. This fires each time the Next.js server process starts, only in the Node.js runtime (not Edge):
The next.config.ts must include experimental.instrumentationHook: true for this to run — the scaffold already set that.
Step 15: Export public API from src/index.ts
Open src/index.ts and add the exports so consumers can import from a single entry point:
ts
export type { LeadInput, LeadRecord, LeadStatus, PipelineResult, ExtractedFields } from "./lib/types.js";export { LeadInputSchema, LeadRecordSchema, ExtractedFieldsSchema } from "./lib/types.js";export { extractLeadsFromFile } from "./lib/document.js";export { classifyAndRoute } from "./lib/classifier.js";export { pushLeadToHubspot } from "./lib/hubspot.js";export { budgetController, initializeOrgBudget, checkBudget, recordSpend, getBudgetState } from "./lib/budget.js";
Consumers can import everything they need from src/index.ts rather than reaching into src/lib/ directly.
Step 16: Add the in-memory lead store
Create src/lib/store.ts:
ts
import type { LeadRecord, LeadStatus } from "./types.js";const store = new Map<string, LeadRecord>();export function saveLead(lead: LeadRecord): void { store.set(lead.id, lead);}export function getLead(id: string): LeadRecord | undefined { return store.get(id);}export function updateLeadStatus(id: string, status: LeadStatus, hubspotContactId?: string): void { const lead = store.get(id); if (lead) { lead.status = status; if (hubspotContactId !== undefined) { lead.hubspotContactId = hubspotContactId; } }}
This is module-level state — it holds leads for the lifetime of the server process. In production you’d swap this for Redis or a database.
Step 17: Run the tests
The test suite lives under tests/ and mocks all external services with MSW. Run it with:
terminal
pnpm test
Expected output: vitest prints a summary. All test cases pass. The coverage report shows all four metrics (lines, branches, functions, statements) above 90% on src/**/*.ts and app/**/route.ts.
The MSW handlers in tests/setup.ts intercept:
POST https://api.cohere.com/v2/chat — returns mock extracted fields JSON
POST https://api.hubapi.com/crm/v3/objects/contacts — returns a mock contact ID
POST https://api.hubapi.com/crm/v3/objects/contacts/search — returns empty results
POST https://slack.com/api/chat.postMessage — returns ok: true
Expected output:onUnhandledRequest: "error" ensures no test accidentally hits a live API. Any unhandled request throws instead of silently passing.
Next steps
Replace the in-memory store with a Redis or PostgreSQL backend for production durability across server restarts.
Add a deduplication step before HubSpot push — use HubSpot’s search API to check for existing contacts by email before deciding between create and update.
Wire trackPipelineLatency into each stage of the ingestion route to surface slowdowns in the Langfuse dashboard.
Add a human-verification callback endpoint at POST /api/leads/[id]/verify that marks a PENDING_REVIEW lead as READY after a sales rep approves it in Slack.
Set up Langfuse prompt variants to compare command-a-03-2025 against a cheaper model and auto-downgrade when budget is in the Degraded state.
??
"eng"
);
try {
const ret = await worker.recognize(imageBuffer);
return ret.data.text;
} finally {
await worker.terminate();
}
}
export async function extractTextFromPdf(pdfBuffer: Buffer): Promise<string> {
const parse = new PDFParse({ data: pdfBuffer });
try {
const result = await parse.getText();
return result.text;
} finally {
await parse.destroy();
}
}
export function detectAndExtractText(
fileBuffer: Buffer,
mimeType: string,
): Promise<string> {
if (mimeType === "application/pdf") {
return extractTextFromPdf(fileBuffer);
}
if (mimeType.startsWith("image/")) {
return extractTextFromImage(fileBuffer);
}
throw new Error(`Unsupported MIME type: ${mimeType}`);
}
export function chunkAndIndexText(text: string): {
chunks: string[];
estimatedTokens: number;
} {
const chunkSize = 512;
const overlap = 50;
const chunks: string[] = [];
let start = 0;
while (start < text.length) {
const end = Math.min(start + chunkSize, text.length);