Independent financial advisors lose 40% of web inquiries due to slow response. Manual sorting of tire-kickers vs. serious prospects wastes billable hours, and no-shows plague calendared consultations.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
Independent financial advisors lose up to 40% of web inquiries simply because they can’t respond fast enough. Manual triage — sorting tire-kickers from serious prospects — burns billable hours, and no-shows plague calendared consultations that do get booked. This tutorial builds a server-side lead intake agent that automates the full pipeline: classify incoming messages by intent, extract contact profiles even from messy text, enforce per-lead cost budgets, store session context in DynamoDB, and hand off qualified leads to HubSpot for scheduling. By the end you’ll have a working POST /api/lead endpoint backed by Databricks LLM serving endpoints, confidence-based routing, and observability via Langfuse.
Prerequisites
Node.js 22+ and pnpm 10
A Databricks workspace with two model serving endpoints (lead-classifier and lead-extractor) deployed — or mock them for local testing
A HubSpot account with a private app access token (CRM scopes: contacts, deals, engagements)
A Langfuse account (cloud or self-hosted) for observability tracing
An AWS account with a DynamoDB table lead-sessions (and lead-sessions_messages) in us-east-1
Basic familiarity with Next.js App Router, TypeScript, and Zod
Step 1: Scaffold the project and install dependencies
Create a new Next.js 16 project with the App Router and TypeScript. The scaffold agent sets up the shell — you add the feature code.
terminal
pnpm create next-app@latest
lead-intake-agent
--typescript
--app
--eslint
--import-alias
"@/*"
cd lead-intake-agent
Open package.json and replace the dependencies block with exact-pinned versions. Every REAA package and third-party library must be pinned to the exact version (no ^ or ~):
Expected output:pnpm-lock.yaml is generated. No peer-dependency warnings from the REAA packages.
Step 2: Configure environment variables
Create .env.example with placeholder values for every integration. The app reads these at runtime:
env
# Env vars used by databricks-lead-intake-agent-for-smb-financial-advisors.# The builder adds entries here as it wires up each integration.# Keep placeholders only — never commit real values.DATABRICKS_HOST=https://<your-workspace>.cloud.databricks.comDATABRICKS_TOKEN=dapi<your-pat>HUBSPOT_ACCESS_TOKEN=<your-hubspot-private-app-token>LANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>LANGFUSE_BASE_URL=https://cloud.langfuse.comAWS_REGION=us-east-1DYNAMODB_SESSIONS_TABLE=lead-sessionsLEAD_BUDGET_LIMIT_USD=0.50CONFIDENCE_ROUTE_THRESHOLD=0.8CONFIDENCE_FALLBACK_THRESHOLD=0.3NODE_ENV=development
Copy it for local development:
terminal
cp .env.example .env.local
Expected output: A .env.local file. The app uses process.env.X references; no secrets are committed.
Step 3: Define Zod schemas and TypeScript types
Create src/lib/schemas.ts — these are the validation schemas for every data structure in the pipeline. Zod 4.x uses the .email() helper for email validation:
Expected output: Two files with clean TypeScript. The LeadProcessingResult type drives the three-way decision tree — every pipeline path produces one of these.
Step 4: Build the confidence router service
The confidence router classifies raw incoming messages by matching keywords. Create src/services/confidence-router-service.ts:
The budget engine enforces per-lead cost caps so a runaway LLM call chain doesn’t burn through your Databricks spend. Create src/services/budget-service.ts:
ts
import { BudgetController } from "@reaatech/agent-budget-engine";import { SpendStore } from "@reaatech/agent-budget-spend-tracker";import { BudgetScope } from "@reaatech/agent-budget-types";const store = new SpendStore();export const controller = new BudgetController({ spendTracker: store });export function defineLeadBudget(leadId: string): void { controller.defineBudget({ scopeType: BudgetScope.Task, scopeKey: leadId, limit: +(process.env.LEAD_BUDGET_LIMIT_USD ?? "0.50"), policy: { softCap: 0.8, hardCap: 1.0, autoDowngrade: [], disableTools: [], }, });}export function checkLeadBudget( leadId: string, estimatedCost: number, modelId: string,) { return controller.check({ scopeType: BudgetScope.Task, scopeKey: leadId, estimatedCost, modelId, tools: [], });}export function recordLeadSpend( leadId: string, cost: number, inputTokens: number, outputTokens: number, modelId: string,): void { controller.record({ requestId: crypto.randomUUID(), scopeType: BudgetScope.Task, scopeKey: leadId, cost, inputTokens, outputTokens, modelId, provider: "databricks", timestamp: new Date(), });}export function getLeadBudgetState(leadId: string) { const state = controller.getState(BudgetScope.Task, leadId); if (!state) { return { spent: 0, remaining: 0, state: "Active" as const }; } return { spent: state.spent, remaining: state.remaining, state: state.state, };}
Expected output: When LEAD_BUDGET_LIMIT_USD is 0.50 and the pipeline spends $0.45 in LLM calls, the next call returning { estimatedCost: 0.10 } gets denied (allowed: false) because it would exceed the $0.50 hard cap.
Step 6: Build the structured repair and Databricks services
The structured repair service converts raw LLM output (which can include markdown fences, string-coerced numbers, and missing fields) into clean typed data. Create src/services/repair-service.ts:
ts
import { repair, repairOutput, isValid, analyzeInput } from "@reaatech/structured-repair-core";import { LeadProfileSchema } from "../lib/schemas.js";import type { LeadProfile } from "../lib/schemas.js";export async function extractLeadProfile(llmOutput: string): Promise<LeadProfile> { return repair(LeadProfileSchema, llmOutput);}export function extractLeadProfileWithDetails(llmOutput: string) { return repairOutput({ schema: LeadProfileSchema, input: llmOutput, debug: false, });}export function isProfileValid(jsonString: string): boolean { return isValid(LeadProfileSchema, jsonString);}export function diagnoseInput(input: string) { return analyzeInput(input);}
The Databricks service wraps your model serving endpoints. Create src/services/databricks-service.ts:
ts
import { WorkspaceClient, Config, ApiError } from "@databricks/sdk-experimental";import type { LeadClassification } from "../lib/schemas.js";export function createDatabricksClient() { const host = process.env.DATABRICKS_HOST; const token = process.env.DATABRICKS_TOKEN; if (!host || !token) { throw new Error("DATABRICKS_HOST and DATABRICKS_TOKEN must be set"); } return new WorkspaceClient(new Config({ host, token }));}export async function callEndpoint( client: WorkspaceClient, endpointName: string, body: Record<string, unknown>,) { return client.apiClient.request({ path: `/serving-endpoints/${endpointName}/invocations`, method: "POST" as const, headers: new Headers(), raw: false, payload: body, });}export async function classifyLead(client: WorkspaceClient, message: string) { const response: unknown = await callEndpoint(client, "lead-classifier", { prompt: message }); const text = typeof response === "string" ? response : JSON.stringify(response); return JSON.parse(text) as LeadClassification;}export async function extractLeadProfileRaw(client: WorkspaceClient, rawMessage: string) { const response: unknown = await callEndpoint(client, "lead-extractor", { prompt: rawMessage }); if (typeof response === "string") return response; return JSON.stringify(response);}export function estimateLlmCost(promptChars: number, responseChars: number): number { const inputCostPerChar = 0.000000435; const outputCostPerChar = 0.00000087; return promptChars * inputCostPerChar + responseChars * outputCostPerChar;}export { ApiError };
Expected output: The Databricks client authenticates using DATABRICKS_HOST and DATABRICKS_TOKEN, then calls the lead-classifier and lead-extractor serving endpoints. The repair service picks up the raw response and produces a validated LeadProfile.
Step 7: Build the DynamoDB session service
Session continuity preserves conversation context across touchpoints. The DynamoDbStorageAdapter implements the IStorageAdapter interface from @reaatech/session-continuity. Create src/lib/dynamodb-adapter.ts:
ts
import { DynamoDBClient, PutItemCommand, GetItemCommand, QueryCommand, UpdateItemCommand, DeleteItemCommand, ListTablesCommand, type AttributeValue,} from "@aws-sdk/client-dynamodb";import type { IStorageAdapter, Session, Message, SessionId, MessageId, HealthStatus, UpdateSessionOptions } from "@reaatech/session-continuity";const DYNAMODB_SESSIONS_TABLE = process.env.DYNAMODB_SESSIONS_TABLE ?? "lead-sessions";function toSessionId(id: string): SessionId { return id;}function toMessageId(id: string): MessageId {
Now create src/services/session-service.ts — it wraps the adapter in a SessionManager with token budgeting:
ts
import { SessionManager } from "@reaatech/session-continuity";import type { TokenCounter, Message } from "@reaatech/session-continuity";import DynamoDbStorageAdapter from "../lib/dynamodb-adapter.js";export class CharTokenCounter implements TokenCounter { readonly model = "char-estimate"; readonly tokenizer = "char-count-4"; count(text: string): number { return Math.ceil(text.length / 4); } countMessages(messages: Message[]): number { let total = 0; for (const msg of messages) { const content = msg.content; total += this.count(typeof content === "string" ? content : JSON.stringify(content)); } return total; }}export function createSessionManager() { return new SessionManager({ storage: new DynamoDbStorageAdapter(), tokenCounter: new CharTokenCounter(), tokenBudget: { maxTokens: 4096, reserveTokens: 500, overflowStrategy: "compress" as const, }, compression: { strategy: "sliding_window" as const, targetTokens: 3500, }, });}
Expected output: The adapter stores sessions in DynamoDB with an { id, data } schema. Messages go to a lead-sessions_messages table. The updateSession method only applies optimistic locking (ConditionExpression) when expectedVersion is explicitly provided — no spurious ValidationException when versioning isn’t needed.
Step 8: Build the HubSpot and handoff services
The HubSpot service manages contacts, deals, and engagements. Create src/services/hubspot-service.ts:
The handoff service orchestrates the HubSpot calls via @reaatech/agent-handoff. It also fires typed events so you can react to scheduling results. Create src/services/handoff-service.ts:
ts
import { createHandoffConfig, TypedEventEmitter, withRetry, HandoffError,} from "@reaatech/agent-handoff";import type { HandoffPayload, HandoffConfig } from "@reaatech/agent-handoff";import type { LeadProfile, LeadClassification } from "../lib/schemas.js";const emitter = new TypedEventEmitter<{ scheduled: { leadId: string }; failed: { leadId: string; error: Error };}>();export function createHandoffConfigFromEnv(): HandoffConfig { return
Expected output:handoffToScheduler calls HubSpot’s search API to find or create a contact, then creates a deal associated with that contact, and schedules a follow-up engagement task. The HubSpot contactId is returned and gets included in the SchedulingResult.
Step 9: Build the observability service
Langfuse traces every step of the pipeline so you can debug individual leads. Create src/services/observability-service.ts:
ts
import { Langfuse } from "langfuse";import type { LeadProcessingResult } from "../lib/types.js";export function createLangfuseClient() { const publicKey = process.env.LANGFUSE_PUBLIC_KEY; const secretKey = process.env.LANGFUSE_SECRET_KEY; const baseUrl = process.env.LANGFUSE_BASE_URL; if (!publicKey || !secretKey || !baseUrl) { throw new Error("LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_BASE_URL must be set"); } return new Langfuse({ publicKey, secretKey, baseUrl });}export function startLeadTrace( langfuse: Langfuse, leadId: string, inboundMessage: string,) { return langfuse.trace({ id: leadId, name: "lead-intake", input: { message: inboundMessage }, });}export function recordSpan( trace: ReturnType<Langfuse["trace"]>, spanName: string, input: unknown, output: unknown,) { return trace.span({ name: spanName, input, output });}export async function endLeadTrace( langfuse: Langfuse, trace: ReturnType<Langfuse["trace"]>, result: LeadProcessingResult,) { trace.update({ output: result }); await langfuse.flushAsync();}export function recordErrorSpan( trace: ReturnType<Langfuse["trace"]>, spanName: string, error: Error,) { return trace.span({ name: spanName, level: "ERROR", statusMessage: error.message, });}
Expected output: Each lead generates a trace named lead-intake with spans for budget-check, classify-lead, extract-profile, session-create, and handoff-scheduler. Errors get level: "ERROR" spans with the error message.
Step 10: Build the lead processing orchestrator
This is the heart of the application. It wires every service together into a pipeline. Create src/services/lead-processor.ts:
ts
import pRetry, { AbortError } from "p-retry";import { defineLeadBudget, checkLeadBudget, recordLeadSpend } from "./budget-service.js";import { createRouter, registerLeadKeywords, classifyLeadIntent } from "./confidence-router-service.js";import { extractLeadProfile, isProfileValid } from "./repair-service.js";import { createSessionManager } from "./session-service.js";import { handoffToScheduler, buildHandoffPayload } from "./handoff-service.js";import { createDatabricksClient, extractLeadProfileRaw, estimateLlmCost } from "./databricks-service.js";import { createLangfuseClient, startLeadTrace, recordSpan, endLeadTrace, recordErrorSpan } from "./observability-service.js";import type { LeadInbound, LeadProfile } from "../lib/schemas.js";import type { LeadProcessingResult }
Expected output: The pipeline runs through 6 stages — budget check, classification, extraction, session creation, handoff, and spend recording. Each stage emits a Langfuse span. The three-way decision tree returns ROUTE, CLARIFY, or FALLBACK at every exit point.
Step 11: Wire up the API route and home page
The API route accepts POST requests and returns the processing result. Create app/api/lead/route.ts:
Note the import path: since app/ lives at the project root and your services are under src/, the relative import goes up three levels (../../../src/services/lead-processor.js). If your scaffold placed app/ inside src/, adjust accordingly.
Update app/page.tsx with a simple landing page that documents the API:
tsx
export default function Home() { return ( <main style={{ maxWidth: 700, margin: "40px auto", padding: "0 20px", fontFamily: "system-ui, sans-serif" }}> <h1>Lead Intake Agent</h1> <p> Qualify, score, and route prospective client inquiries from web forms, email, or chat — automatically scheduling follow-up calls for high-value leads. </p> <hr /> <h2>API Usage</h2> <p>Submit a lead via <code>POST /api/lead</code> with JSON body:</p> <pre style={{ background: "#f5f5f5", padding: 16, borderRadius: 8, overflow: "auto" }}>{`{ "message": "I have $500k to invest and want to schedule a call", "source": "web_form", "contactEmail": "investor@example.com", "contactName": "Jane Doe"}`} </pre> <p>Response:</p> <pre style={{ background: "#f5f5f5", padding: 16, borderRadius: 8, overflow: "auto" }}>{`{ "leadId": "...", "decision": "ROUTE", "profile": { "name": "Jane Doe", "email": "investor@example.com", "estimatedAssets": 500000, "status": "qualified" }, "schedulingResult": { "scheduled": true }}`} </pre> <h3>cURL Example</h3> <pre style={{ background: "#f5f5f5", padding: 16, borderRadius: 8, overflow: "auto" }}>{`curl -X POST http://localhost:3000/api/lead \\ -H "Content-Type: application/json" \\ -d '{"message":"I want to schedule a meeting","source":"web_form"}'`} </pre> </main> );}
Now update src/index.ts to re-export the public API:
ts
export { processInboundLead, processInboundLeadWithRetry } from "./services/lead-processor.js";export type { LeadInbound, LeadProfile } from "./lib/schemas.js";export type { LeadProcessingResult } from "./lib/types.js";
Expected output: Starting the dev server (pnpm dev) and hitting curl -X POST http://localhost:3000/api/lead -H "Content-Type: application/json" -d '{"message":"I want a meeting","source":"web_form"}' returns a 200 response with { "leadId": "..." , "decision": "ROUTE", ... }.
Step 12: Write tests and verify quality gates
The project requires at least 90% line/branch/function/statement coverage on runtime code. Create tests/lib/schemas.test.ts to validate the Zod schemas:
Create tests/app/api/lead/route.test.ts for the API route handler:
ts
import { describe, it, expect, vi } from "vitest";import { NextRequest } from "next/server";vi.mock("../../../../src/services/lead-processor.js", () => ({ processInboundLead: vi.fn().mockResolvedValue({ leadId: "lead-1", decision: "ROUTE", schedulingResult: { scheduled: true } }), processInboundLeadWithRetry: vi.fn().mockImplementation((inbound: { message: string; source: string }) => { if (!inbound.message || inbound.message.trim().length === 0) throw new Error("validation error"
Create tests/services/lead-processor.test.ts as the integration test for the orchestrator, mocking all downstream services. Below is a representative subset; the full file includes additional edge-case tests for non-Error throws, session failures, and target-type fallback:
Expected output: TypeScript compiles cleanly, ESLint passes with no banned comments, and vitest reports numFailedTests: 0 with coverage >= 90% on lines, branches, functions, and statements for src/ and app/**/route.ts.
Next steps
Add a webhook receiver — wire up the confidence router to accept incoming emails or chat webhooks directly, so the agent runs without user interaction
Implement lead scoring — extend the classifier with an LLM-based scoring layer that grades leads by estimated AUM, urgency, and fit to your advisory firm’s ideal client profile
Add Slack notifications — listen to the handoff emitter’s scheduled and failed events and post alerts to a Slack channel so advisors know when a new qualified lead is ready
Build a dashboard — visualize pipeline metrics (leads by decision type, average processing time, budget utilization per lead) using the Langfuse trace data
return id;
}
function s(value: string): AttributeValue {
return { S: value };
}
function jsonAttr(value: unknown): AttributeValue {
return { S: JSON.stringify(value) };
}
class DynamoDbStorageAdapter implements IStorageAdapter {
private client: DynamoDBClient;
constructor() {
this.client = new DynamoDBClient({ region: process.env.AWS_REGION });