Databricks RAG Pipeline for Insurance Policy Analysis
A retrieval‑augmented generation service that lets small insurance agencies query policy documents with natural language, backed by Databricks LLMs and pgvector.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building a retrieval-augmented generation pipeline for insurance policy analysis using Next.js 16 and the App Router. You’ll build a service that lets insurance brokers query policy PDFs with natural language: upload a document, have it chunked and embedded via VoyageAI, stored in pgvector (PostgreSQL), and then ask questions answered by a Databricks-hosted LLM with semantic caching, context-window planning, structured output repair, and per-tenant cost telemetry.
Prerequisites
Node.js 22+ and pnpm 10 installed on your machine
A Databricks workspace with a model serving endpoint (e.g., databricks-dbrx-instruct) and a PAT token
A VoyageAI API key for embedding generation
A PostgreSQL database with the pgvector extension installed
A Langfuse account (for LLM observability) — optional but recommended
Basic familiarity with TypeScript and Next.js App Router conventions
Step 1: Scaffold the project and configure environment variables
The scaffold provides Next.js 16 with the App Router, all dependencies pinned to exact versions in package.json, and a src/ directory for your service code. The key packages you’ll use are:
@databricks/sdk-experimental for Databricks authentication and API calls
@reaatech/agent-memory-storage for pgvector-backed chunk persistence
@reaatech/agent-memory-retrieval for hybrid retrieval strategies
@reaatech/context-window-planner for packing chunks within token budgets
@reaatech/llm-cache for semantic caching of LLM responses
@reaatech/structured-repair-core for repairing malformed JSON output
@reaatech/llm-cost-telemetry for per-tenant usage tracking
voyageai for text embeddings
fastembed as a local embedding fallback
pdfjs-dist for PDF text extraction
pgvector for vector database operations
postgres for raw SQL
zod for request validation and schema definitions
langfuse for LLM observability tracing
Open .env.example and fill in the placeholders with your credentials:
env
# Env vars used by databricks-rag-pipeline-for-insurance-policy-analysis.# The builder adds entries here as it wires up each integration.# Keep placeholders only — never commit real values.NODE_ENV=development# Databricks auth (for WorkspaceClient)DATABRICKS_HOST=https://your-workspace.cloud.databricks.comDATABRICKS_TOKEN=dapi...DATABRICKS_CLIENT_ID=DATABRICKS_CLIENT_SECRET=DATABRICKS_SERVING_ENDPOINT=databricks-dbrx-instruct# VoyageAI embeddingsVOYAGE_API_KEY=<your-voyage-api-key># PostgreSQL with pgvectorPGHOST=localhostPGPORT=5432PGDATABASE=insurance_policiesPGUSER=postgresPGPASSWORD=<your-db-password># Langfuse observabilityLANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>LANGFUSE_HOST=https://cloud.langfuse.com# LLM cache tuningLLM_CACHE_SIMILARITY_THRESHOLD=0.8LLM_CACHE_TTL_SECONDS=3600# Cost telemetry (from @reaatech/llm-cost-telemetry)DEFAULT_DAILY_BUDGET=100.0
Expected output: Your .env file now has all the integration credentials the pipeline needs.
Step 2: Define shared types and configuration
Create src/types/index.ts with the domain models and API response types that flow through the pipeline:
Exponential backoff retry with 3 attempts for transient VoyageAI failures (rate limits, 5xx errors)
Graceful fallback to a local BGE embedding model if VoyageAI is unavailable
Separate methods for single-text (embedText) and batch (embedBatch) embedding, since the ingestion pipeline processes paragraphs in bulk while the query pipeline only needs one embedding
Expected output: Your terminal shows no import errors. The service initializes and falls back gracefully if fastembed is missing.
Step 5: Build the storage service with PostgresMemoryStorage
Create src/services/storage-service.ts — the persistence layer that wraps @reaatech/agent-memory-storage to store policy chunks in pgvector:
ts
import pgvector from "pgvector";import { PostgresMemoryStorage, InMemoryMemoryStorage, MemoryQuery,} from "@reaatech/agent-memory-storage";import { MemoryType, MemoryImportance, MemorySource, MemoryLifecycle, type Memory,} from "@reaatech/agent-memory-core";import { config } from "../lib/config.js";import type { PolicyChunk } from "../types/index.js";export class StorageService { private store: PostgresMemoryStorage | InMemoryMemoryStorage | null =
Expected output: The storage service handles single-chunk inserts, batch inserts, similarity search scoped to a tenant, and rollback deletion on failure.
Step 6: Build the context window planner
Create src/lib/context-planner.ts — this module uses @reaatech/context-window-planner to pack chunks, system instructions, and the user query into a prompt that fits within Databricks’ 128K token context window:
Expected output: The planner allocates 40% of the 128K budget to RAG chunks, reserves 4,096 tokens for overhead, and drops lowest-relevance chunks when the budget is exceeded.
Step 7: Build the Databricks LLM service
Create src/services/databricks-service.ts — this authenticates with Databricks and calls the model serving endpoint with an OpenAI-compatible chat-completions request:
Expected output: LLM outputs that come back wrapped in markdown code fences or with minor formatting issues are automatically repaired into valid typed objects. Every call records a cost span and traces to Langfuse.
Step 10: Build the ingestion pipeline
Create src/services/ingestion-service.ts — this is the document ingestion pipeline. It extracts text from PDFs using pdfjs-dist, splits into chunks with overlap, embeds via the embedding service, and persists via storage:
ts
import { getDocument } from "pdfjs-dist";import { StorageService } from "./storage-service.js";import { EmbeddingService } from "./embedding-service.js";import { recordLlmCall } from "../lib/cost-telemetry.js";import { traceLlmCall } from "../lib/observability.js";import { type PolicyMetadata, type PolicyChunk, type IngestionResult, PdfParseError,} from "../types/index.js";interface LangfuseInstance { trace?: (params: Record<string, unknown>) => { update
Expected output: A PDF upload flows through text extraction → paragraph chunking (max 1,000 chars, 100-char overlap) → batch embedding → pgvector storage, with cost telemetry and Langfuse tracing. If embedding fails partway through, previously stored chunks are rolled back.
Step 11: Build the query pipeline (the RAG loop)
Create src/services/query-service.ts — the core RAG loop: check cache, embed the question, retrieve relevant chunks, plan context, call Databricks LLM, repair the output, cache the result, and record telemetry:
ts
import { CacheService } from "./cache-service.js";import { EmbeddingService } from "./embedding-service.js";import { StorageService } from "./storage-service.js";import { DatabricksService } from "./databricks-service.js";import { ContextPlanner } from "../lib/context-planner.js";import { repairLlmOutput as repairLlmOutputBase } from "../lib/repair.js";import { recordLlmCall as recordLlmCallBase, checkBudget } from "../lib/cost-telemetry.js";import { traceLlmCall as traceLlmCallBase } from "../lib/observability.js";import type { QueryResponse } from "../types/index.js";interface
Expected output: Each query runs through the complete 9-step pipeline: cache check → budget check → embedding → retrieval → context planning → LLM call → output repair → cache write → telemetry. Each error path (cache hit, empty chunks, LLM unavailable, repair failure) returns a graceful fallback instead of crashing.
Step 12: Create the API routes
Now wire everything together with Next.js App Router route handlers. Start with the ingestion endpoint at app/api/ingest/route.ts:
ts
import { type NextRequest, NextResponse } from "next/server";import { IngestionService } from "../../../src/services/ingestion-service.js";import { EmbeddingService } from "../../../src/services/embedding-service.js";import { StorageService } from "../../../src/services/storage-service.js";let ingestionService: IngestionService | null = null;async function getIngestionService(): Promise<IngestionService> { if (!ingestionService) { const storageService = new StorageService(false); storageService.initialize(); const embeddingService = new EmbeddingService(); await embeddingService.initialize(); ingestionService = new IngestionService(embeddingService, storageService); } return ingestionService;}export async function POST(req: NextRequest): Promise<NextResponse> { try { const formData = await req.formData(); const file = formData.get("file"); if (!file || !(file instanceof File)) { return NextResponse.json({ error: "PDF file required" }, { status: 400 }); } const buffer = Buffer.from(await file.arrayBuffer()); const tenantId = (formData.get("tenantId") as string) || "default"; const metadata = { filename: file.name, description: (formData.get("description") as string) || "", tags: ((formData.get("tags") as string) || "").split(",").filter(Boolean), }; const service = await getIngestionService(); const result = await service.ingestPdf(buffer, metadata, tenantId); return NextResponse.json(result, { status: 201 }); } catch (err) { const message = err instanceof Error ? err.message : "Ingestion failed"; return NextResponse.json({ error: message }, { status: 500 }); }}
Create the query endpoint at app/api/query/route.ts:
ts
import { type NextRequest, NextResponse } from "next/server";import { z } from "zod";import { QueryService } from "../../../src/services/query-service.js";import { CacheService } from "../../../src/services/cache-service.js";import { EmbeddingService } from "../../../src/services/embedding-service.js";import { StorageService } from "../../../src/services/storage-service.js";import { DatabricksService } from "../../../src/services/databricks-service.js";import { ContextPlanner } from "../../../src/lib/context-planner.js";import { repairLlmOutput } from "../../../src/lib/repair.js";import { recordLlmCall } from "../../../src/lib/cost-telemetry.js";import { traceLlmCall } from "../../../src/lib/observability.js";const querySchema = z.object({ question: z.string().min(1), tenantId: z.string().min(1),});let queryService: QueryService | null = null;async function getQueryService(): Promise<QueryService> { if (!queryService) { const storageService = new StorageService(false); storageService.initialize(); const cacheService = new CacheService(); const embeddingService = new EmbeddingService(); await embeddingService.initialize(); const contextPlanner = new ContextPlanner(); const databricksService = new DatabricksService(); queryService = new QueryService( cacheService, embeddingService, storageService, contextPlanner, databricksService, repairLlmOutput, recordLlmCall, traceLlmCall, ); } return queryService;}export async function POST(req: NextRequest): Promise<NextResponse> { try { const body = await req.json() as Record<string, unknown>; const parsed = querySchema.safeParse(body); if (!parsed.success) { return NextResponse.json( { error: "Invalid request", details: parsed.error.issues }, { status: 400 }, ); } const { question, tenantId } = parsed.data; const service = await getQueryService(); const result = await service.query(question, tenantId); return NextResponse.json(result, { status: 200 }); } catch (err) { const message = err instanceof Error ? err.message : "Query failed"; return NextResponse.json({ error: message }, { status: 500 }); }}
Create the health check endpoint at app/api/health/route.ts:
Expected output: Three API routes — POST /api/ingest (multipart PDF upload), POST /api/query (JSON RAG query), and GET /api/health (dependency check). All use NextRequest/NextResponse from next/server.
Step 13: Run the tests
The project ships with a test suite covering every service, library module, and API route. Verify everything compiles and passes:
terminal
pnpm typecheck
Expected output:tsc exits 0 with no type errors.
terminal
pnpm lint
Expected output: ESLint exits 0 with no violations.
terminal
pnpm test
Expected output: All 111 tests pass (numFailedTests is 0). Coverage metrics (lines, branches, functions, statements) are all at 90% or above.
Key test categories:
Unit tests for each service (embedding-service, databricks-service, storage-service, cache-service, query-service, ingestion-service) — mocking external dependencies via vi.mock
Lib tests for config, context-planner, repair, cost-telemetry, observability — testing validation, token budgeting, Zod schema repair, and cost calculations
API route tests for health, ingest, query — constructing real NextRequest objects and invoking handlers directly
Next steps
Add document-level deduplication — store a content hash per document so re-uploading the same PDF returns the existing documentId instead of duplicating chunks
Add per-tenant budget enforcement — extend checkBudget() to query actual cumulative spend from a database instead of using a static global check
Replace InMemoryAdapter with Redis — swap the cache’s InMemoryAdapter for a Redis-backed adapter so cached responses survive server restarts and scale across instances
Deploy behind a rate limiter — add middleware to prevent individual tenants from overwhelming the Databricks serving endpoint
null
;
private useInMemory: boolean;
constructor(useInMemory = false) {
this.useInMemory = useInMemory;
}
initialize(): void {
if (!this.useInMemory) {
this.store = new PostgresMemoryStorage({
host: config.database.host,
port: config.database.port,
database: config.database.database,
user: config.database.user,
password: config.database.password,
schema: "public",
});
} else {
this.store = new InMemoryMemoryStorage();
}
}
private buildMemoryObject(
chunk: Omit<PolicyChunk, "id">,
id: string,
): Memory {
const now = new Date();
const vec = chunk.embedding ?? [];
const vector: number[] = vec.length > 0
? (() => {
const result = pgvector.fromSql(pgvector.toSql(vec));
"You are an insurance policy analyst. Answer the question using ONLY the provided policy excerpts. Respond with a JSON object containing: answer (string), sources (array of excerpt IDs), confidence (0-1 number), coverageGaps (array of strings for uncovered areas).";