SMB sales teams lose leads buried in email attachments and messy web forms, relying on manual data entry that delays follow-up and lets opportunities slip.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
In this tutorial you’ll build an automated lead intake pipeline that accepts files from email attachments and web forms, parses text from PDFs and images using OCR, extracts structured lead fields with Google Gemini, scrubs PII, classifies each lead by quality, and upserts qualified records into Salesforce. By the end you’ll have a running Next.js API with three endpoints, a full test suite with coverage, and a solid understanding of how to chain real AI packages — @google/genai, @reaatech/confidence-router, @reaatech/agent-budget-engine, @presidio-dev/hai-guardrails, and more — into an ingestion service you can extend.
Prerequisites
Node.js >= 22 with pnpm 10.x installed (pnpm --version should show 10.x)
An OpenAI API key — used by the LLM cache’s embedding model (text-embedding-3-small)
A Salesforce Developer Edition account (free at developer.salesforce.com) — you’ll need the username, password, and security token
A Langfuse project (free-tier at langfuse.com) for observability tracing
Familiarity with TypeScript and basic Next.js App Router conventions
Step 1: Scaffold the Next.js project
Create a fresh Next.js project with the App Router and TypeScript. Next.js 16 with create-next-app gives you the right starting shell — you’ll replace the default content as you go.
Expected output: create-next-app prints success messages and creates a google-gemini-lead-intake/ directory with app/, src/, package.json, tsconfig.json, next.config.ts, and eslint.config.mjs. Your working directory is now google-gemini-lead-intake.
Step 2: Install dependencies
Install every package the recipe needs. Each version is pinned exactly — no ^ or ~ — so your build is reproducible. The REAA packages (@reaatech/*) are private; you’ll find their tarballs in your workspace or registry.
The recipe’s vitest.config.ts already includes these same variables under its test.env block with test-safe placeholder values, so your test runner works without a real .env file. The scaffold creates a minimal next.config.ts — leave it as-is for now.
Expected output: you have a .env.local file (git-ignored by default) with all required credentials, and you can verify with cat .env.local.
Step 4: Define the lead domain types
The pipeline moves data through three representations: a raw inbound payload, an extracted lead from Gemini, and a qualified lead with a routing decision. Define these as TypeScript interfaces and Zod schemas.
export * from "./lead.js";export * from "./lead-schemas.js";
Expected output: pnpm typecheck passes with no errors in these files.
Step 5: Create the Gemini extraction client
This module wraps @google/genai to call Gemini 2.5 Flash with a structured prompt that asks for lead fields as JSON. It parses the response and validates it against ExtractedLeadSchema.
Create src/lib/gemini/client.ts:
ts
import { GoogleGenAI } from "@google/genai";import { ExtractedLeadSchema } from "../../types/lead-schemas.js";import type { ExtractedLead } from "../../types/lead.js";import { GeminiError } from "../errors.js";export interface GeminiClient { generateContent(prompt: string | string[], opts?: { model?: string }): Promise<{ text?: string }>;}export function createGeminiClient(): GeminiClient { const apiKey = process.env.GEMINI_API_KEY; if (!apiKey) { throw new GeminiError("GEMINI_API_KEY is not set", 500); } const ai = new GoogleGenAI({ apiKey }); return { async generateContent(prompt: string | string[], opts?: { model?: string }) { try { const contents = Array.isArray(prompt) ? prompt : prompt; const model = opts?.model ?? "gemini-2.5-flash"; const response = await ai.models.generateContent({ model, contents }); return { text: response.text }; } catch (e: unknown) { if (e && typeof e === "object" && "name" in e && "message" in e) { const err = e as { name: string; message: string; status?: number }; throw new GeminiError(`Gemini API error: ${err.message}`, err.status ?? 500); } throw new GeminiError("Unknown Gemini API error", 500); } }, };}export async function extractLeadFromText( ai: GeminiClient, rawText: string,): Promise<ExtractedLead> { const prompt = [ "Extract lead information from the following text. Return ONLY valid JSON with these fields:", '{ "name": string | null, "company": string | null, "email": string | null, "phone": string | null, "needs": string | null }', "Text:", rawText, ]; const response = await ai.generateContent(prompt, { model: "gemini-2.5-flash" }); const text = response.text ?? ""; let parsed: Record<string, string | null>; try { parsed = JSON.parse(text) as Record<string, string | null>; } catch { throw new GeminiError("Failed to parse Gemini response as JSON", 500); } const lead = ExtractedLeadSchema.parse({ name: parsed.name ?? undefined, company: parsed.company ?? undefined, email: parsed.email ?? undefined, phone: parsed.phone ?? undefined, needs: parsed.needs ?? undefined, source: "email_attachment", rawText, }); return lead;}export const geminiClient: GeminiClient = createGeminiClient();
Create src/lib/errors.ts with the custom error types used across the pipeline:
The PII guard module uses @presidio-dev/hai-guardrails to detect prompt injection and redact sensitive data. The observability module wraps Langfuse tracing around each pipeline run.
The budget controller enforces a per-lead spending limit so runaway costs are impossible. The LLM cache reuses Gemini responses for identical or semantically similar inputs.
Expected output: pnpm typecheck passes with all the REAA package imports resolved.
Step 8: Build the document parser
The parser handles three content types: PDF text extraction via unpdf, image OCR via tesseract.js, and plain text passthrough. Unsupported MIME types throw a DocumentParseError.
Create src/services/document-parser.ts:
ts
import { extractText, getDocumentProxy } from "unpdf";import { createWorker } from "tesseract.js";import type { DocumentParseResult } from "@/src/types/lead";import { DocumentParseError } from "@/src/lib/errors";export async function parseDocument( content: Uint8Array, mimeType: string): Promise<DocumentParseResult> { if (mimeType === "application/pdf") { const pdf = await getDocumentProxy(new Uint8Array(content)); const { totalPages, text } = await extractText(pdf, { mergePages: true, }); return { text, totalPages, parseMethod: "pdf" }; } if (mimeType.startsWith("image/")) { const worker = await createWorker("eng"); const ret = await worker.recognize(Buffer.from(content)); const text = ret.data.text; await worker.terminate(); return { text, totalPages: 1, parseMethod: "ocr" }; } if (mimeType === "text/plain") { const text = new TextDecoder().decode(content); return { text, totalPages: 1, parseMethod: "form" }; } throw new DocumentParseError(`Unsupported MIME type: ${mimeType}`);}
Expected output: pnpm typecheck is clean. The parser is a pure function that takes bytes and a MIME type, returning structured text — easy to test in isolation.
Step 9: Build lead extraction and classification services
The extractor wraps the Gemini call with budget checks and LLM caching. The classifier applies heuristic rules (name + email + needs = high value, etc.) and feeds them into the @reaatech/confidence-router for the final route/clarify/fallback decision.
Expected output: pnpm typecheck passes. The classifier maps heuristic labels plus confidence through the router for routing decisions.
Step 10: Wire the pipeline orchestrator
This is the central function that chains everything together: PII scrubbing, document parsing, Gemini extraction, field-level redaction, confidence routing, and Salesforce upsert. It wraps the entire flow in a Langfuse trace.
Add webhook support — have src/workers/classify.ts run on a cron trigger using @reaatech/classifier-evals to periodically evaluate classification accuracy and push metrics to Langfuse, catching model drift before it affects lead quality.
Upgrade caching storage — swap InMemoryAdapter in @reaatech/llm-cache for Redis or DynamoDB storage adapters so caching survives restarts and scales across multiple server instances.
Enrich with Hybrid RAG — the pipeline already imports @reaatech/hybrid-rag types; wire a full retrieval step that pulls context from a knowledge base (pricing docs, product specs) before routing, so low-confidence leads get enriched rather than falling back.