Small businesses drown in market reports, PDFs, and industry analyses but lack a way to turn them into actionable insights quickly. Manually sifting through hundreds of pages is slow and often misses critical connections across internal and external data.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This recipe builds a market research pipeline for small businesses. You upload PDF, DOCX, and XLSX files containing market reports, industry analyses, and competitive data. The pipeline processes those documents, indexes them into a Qdrant vector database, and gives you a chat interface that answers questions by retrieving relevant chunks and passing them to Perplexity’s language models — with spending limits enforced per user.
Prerequisites
Node.js 22 or higher
pnpm installed
Perplexity API key from perplexity.ai
OpenAI API key (for text-embedding-3-small embeddings)
Qdrant running locally or at a remote URL (Docker: docker run -p 6333:6333 qdrant/qdrant)
Step 1: Install dependencies
The scaffold is already in your working directory. Install everything in one shot:
terminal
pnpm install
This pulls in all pinned packages including @reaatech/hybrid-rag-ingestion, @reaatech/hybrid-rag-retrieval, perplexity-sdk, langfuse, unpdf, mammoth, xlsx, and the budget engine stack.
Langfuse keys are optional — tracing gracefully falls back to a no-op when they are absent.
Step 3: Define the shared config
The src/lib/config.ts file centralizes all configuration reading. It sets up Qdrant connection details, OpenAI embedding config, BM25 parameters, fusion strategy, and budget defaults:
BM25 uses k1=1.2, b=0.75 — standard IR parameters. Fusion uses RRF (Reciprocal Rank Fusion) which handles heterogeneous score distributions without tuning.
Step 4: Define API request schemas
Use Zod to validate incoming requests at the boundary. The src/lib/schemas.ts file defines two schemas for the chat and evaluation endpoints, plus a budget check schema used internally:
typescript
import { z } from 'zod';export const ChatRequestSchema = z.object({ query: z.string().min(1, 'Query is required'), userId: z.string().min(1, 'userId is required'),});export const EvaluateRequestSchema = z.object({ datasetPath: z.string().min(1, 'datasetPath is required'),});export const BudgetCheckSchema = z.object({ scopeKey: z.string().min(1), estimatedCost: z.number().positive(), modelId: z.string().min(1), tools: z.array(z.string()).optional(),});
The chat route parses its JSON body through ChatRequestSchema.parse() before doing anything else.
Step 5: Implement document ingestion
The ingestion service handles three file formats: PDF via unpdf, DOCX via mammoth, and XLSX via xlsx. Raw text passes through TextPreprocessor and DocumentValidator before chunking with chunkDocument using the semantic strategy:
typescript
import { TextPreprocessor, DocumentValidator, chunkDocument } from '@reaatech/hybrid-rag-ingestion';import { ChunkingStrategy } from '@reaatech/hybrid-rag';import type { Chunk, ChunkingConfig } from '@reaatech/hybrid-rag';import { getDocumentProxy, extractText } from 'unpdf';import mammoth from 'mammoth';import * as XLSX from 'xlsx';const preprocessor = new TextPreprocessor({ normalizeUnicode: true, normalizeWhitespace: true,});const validator = new DocumentValidator({ maxFileSize: 10 * 1024 * 1024, minContentLength: 1,});async function extractPdfText(buffer: Uint8Array): Promise<string> { const pdf = await getDocumentProxy(buffer); const { text } = await extractText(pdf, { mergePages: true }); return text;}async function extractDocxText(buffer: Uint8Array): Promise<string> { const result = await mammoth.extractRawText({ buffer: Buffer.from(buffer) }); return result.value;}function extractXlsxText(buffer: Uint8Array): string { const workbook = XLSX.read(buffer, { type: 'array' }); const texts: string[] = []; for (const sheetName of workbook.SheetNames) { const sheet = workbook.Sheets[sheetName]; if (!sheet) continue; const values = XLSX.utils.sheet_to_json<string[]>(sheet, { header: 1 }); for (const row of values) { if (Array.isArray(row)) { texts.push(row.filter((cell): cell is string => typeof cell === 'string' && cell.length > 0).join(' ')); } } } return texts.join('\n');}async function extractTextByMime(buffer: Uint8Array, mimeType: string): Promise<string> { switch (mimeType) { case 'application/pdf': return extractPdfText(buffer); case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': return extractDocxText(buffer); case 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': return extractXlsxText(buffer); default: throw new Error(`Unsupported MIME type: ${mimeType}`); }}export async function processDocument( fileBuffer: Uint8Array, mimeType: string, docId: string,): Promise<Chunk[]> { const rawText = await extractTextByMime(fileBuffer, mimeType); const preprocessed = preprocessor.preprocess(rawText); const content = preprocessed.content; const doc = { id: docId, content, source: `upload-${docId}`, metadata: { mimeType }, }; const validation = validator.validate(doc); if (!validation.isValid) { throw new Error(`Document validation failed: ${validation.errors.join(', ')}`); } const chunkConfig: ChunkingConfig = { strategy: ChunkingStrategy.SEMANTIC, chunkSize: 512, overlap: 50, similarityThreshold: 0.5, }; const chunks = await chunkDocument(content, docId, chunkConfig, doc.metadata); return chunks;}
processDocument returns Chunk[] with deterministic IDs. Callers feed those chunks into the vector indexer.
Step 6: Index chunks into Qdrant
The vector indexer at src/services/vector-index.ts wraps HybridRetriever from @reaatech/hybrid-rag-retrieval. It lazy-initializes on first use and exposes indexChunks:
Call indexChunks after processDocument to store vectors and BM25 index in Qdrant.
Step 7: Build the retrieval service
The retrieval service at src/services/retrieval.ts exposes retrieveContext which calls retriever.retrieve() with options for topK, metadata filters, and retrieval mode (hybrid, vector-only, or bm25-only):
The function defaults to hybrid retrieval mode. Override with retrievalMode: 'vector' or retrievalMode: 'bm25' for specific use cases.
Step 8: Wire up the Perplexity chat service
The chat service at src/services/chat.ts initializes the Perplexity SDK client and wraps chatCompletionsPost. It builds a system message with the retrieved context and a user message with the query:
typescript
import Perplexity, { ChatCompletionsPostRequestModelEnum } from 'perplexity-sdk';import { ChatCompletionsPostRequestMessagesInner } from 'perplexity-sdk';import { getRequiredEnv } from '../lib/config.js';let client: ReturnType<typeof createClient> | null = null;function createClient() { const p = new Perplexity({ apiKey: getRequiredEnv('PERPLEXITY_API_KEY') }); return p.client();}function getClient() { if (!client) { client = createClient(); } return client;}export async function generateAnswer( query: string, context: string, model?: string,): Promise<{ answer: string; usage: { inputTokens: number; outputTokens: number } }> { const c = getClient(); const systemContent = `You are a market research assistant. Use the following documents as context to answer the user's question accurately. If the context does not contain enough information, say so clearly.\n\nContext:\n${context}`; const systemMessage = new ChatCompletionsPostRequestMessagesInner(); systemMessage.role = 'system'; systemMessage.content = systemContent; const userMessage = new ChatCompletionsPostRequestMessagesInner(); userMessage.role = 'user'; userMessage.content = query; const result = await c.chatCompletionsPost({ model: (model ?? ChatCompletionsPostRequestModelEnum.Mistral7bInstruct) as ChatCompletionsPostRequestModelEnum, messages: [systemMessage, userMessage], }); const answer = result.choices?.[0]?.message?.content ?? ''; const inputTokens = result.usage?.promptTokens ?? 0; const outputTokens = result.usage?.completionTokens ?? 0; return { answer, usage: { inputTokens, outputTokens }, };}
The default model is Mistral7bInstruct. The budget engine can suggest a cheaper model when spending is tight, and generateAnswer accepts that suggestion via the optional model parameter.
Step 9: Integrate the budget controller
The budget service at src/services/budget.ts uses BudgetController from @reaatech/agent-budget-engine with SpendStore from @reaatech/agent-budget-spend-tracker. It defines a wildcard user budget on startup with a 10 USD default limit, 80% soft cap, and 100% hard cap:
typescript
import { BudgetController } from '@reaatech/agent-budget-engine';import { SpendStore } from '@reaatech/agent-budget-spend-tracker';import { BudgetScope } from '@reaatech/agent-budget-types';import type { BudgetCheckResult, BudgetState, SpendEntry } from '@reaatech/agent-budget-types';import { PerplexityPricingProvider } from '../lib/pricing.js';import { config } from '../lib/config.js';import { nanoid } from 'nanoid';const store = new SpendStore();const pricing = new PerplexityPricingProvider();const controller = new BudgetController({ spendTracker: store, pricing });controller.defineBudget({ scopeType: BudgetScope.User, scopeKey: '*', limit: config.budget.defaultLimit, policy: { softCap: 0.8, hardCap: 1.0, autoDowngrade: [], disableTools: [], },});export function checkBudget( scopeKey: string, estimatedCost: number, modelId: string, tools: string[] = [],): BudgetCheckResult { return controller.check({ scopeType: BudgetScope.User, scopeKey, estimatedCost, modelId, tools, });}export function recordSpend( scopeKey: string, cost: number, inputTokens: number, outputTokens: number, modelId: string, provider: string,): void { const entry: SpendEntry = { requestId: nanoid(), scopeType: BudgetScope.User, scopeKey, cost, inputTokens, outputTokens, modelId, provider, timestamp: new Date(), }; controller.record(entry);}export function getBudgetState(scopeKey: string): BudgetState | undefined { return controller.getState(BudgetScope.User, scopeKey);}export { controller };
checkBudget returns { allowed, action, suggestedModel, disabledTools }. The chat route checks allowed === false or action === EnforcementAction.HardStop and responds with 402.
Step 10: Add pricing provider
The pricing provider at src/lib/pricing.ts implements the PricingProvider interface expected by BudgetController. It defines Perplexity model pricing and estimates cost per request:
Output tokens are estimated as 25% of input tokens. Update the pricing table when Perplexity changes rates.
Step 11: Create the ingest API route
The ingest route at app/api/ingest/route.ts accepts multipart uploads, runs processDocument, then indexChunks, and returns the document ID and chunk count:
Missing file returns 400. Oversized file returns 413. Any other error returns 400 with the error message.
Step 12: Create the chat API route
The chat route at app/api/chat/route.ts orchestrates retrieval, budget check, generation, and spend recording in sequence. It parses the request with Zod, checks budget before calling Perplexity, records spend after a successful response, and traces the generation span with Langfuse:
The response includes answer, sources (the top retrieved chunks with scores), and usage (token counts). Budget exceeded returns 402.
Step 13: Add Langfuse tracing
The langfuse service at src/services/langfuse.ts initializes the client lazily when keys are present, and provides trace helpers that the route handlers call:
When LANGFUSE_PUBLIC_KEY or LANGFUSE_SECRET_KEY are absent, getTracer() returns null and all trace calls become no-ops.
Step 14: Add server actions for the frontend
The app/actions.ts file exposes ingestDocument and sendMessage as server actions, which the React frontend calls directly without needing fetch URLs:
typescript
'use server';const BASE_URL = process.env.NEXT_PUBLIC_BASE_URL || 'http://localhost:3000';export async function ingestDocument( formData: FormData,): Promise<{ documentId: string; chunkCount: number }> { const res = await fetch(`${BASE_URL}/api/ingest`, { method: 'POST', body: formData, }); if (!res.ok) { const err = await res.json() as { error: string }; throw new Error(err.error || 'Ingest failed'); } return res.json() as Promise<{ documentId: string; chunkCount: number }>;}export async function sendMessage( query: string, userId: string,): Promise<{ answer: string; sources: Array<{ chunkId: string; documentId: string; content: string; score: number }>; usage: { inputTokens: number; outputTokens: number } }> { const res = await fetch(`${BASE_URL}/api/chat`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ query, userId }), }); if (!res.ok) { const err = await res.json() as { error: string }; throw new Error(err.error || 'Chat failed'); } return res.json() as Promise<{ answer: string; sources: Array<{ chunkId: string; documentId: string; content: string; score: number }>; usage: { inputTokens: number; outputTokens: number } }>;}
The frontend at app/page.tsx uses these actions to handle file uploads and chat messages in a single-page interface.
Step 15: Run the tests
The test suite covers ingestion, retrieval, chat, budget, and the route handlers. Run it with:
terminal
pnpm test
Expected output: 0 failed tests, 91 total tests passing across 35 test suites. The unit tests mock external dependencies with vi.mock and MSW interceptors so they run without any live network calls.
Next steps
Run evaluation against a ground-truth dataset to measure retrieval quality: create a .jsonl file with query_id, query, relevant_docs, and relevant_chunks, then POST to /api/evaluate.
Swap the in-memory SpendStore for a Redis-backed store to persist budget state across restarts.
Add authentication middleware so each user gets their own budget scope instead of sharing the wildcard.
Experiment with different ChunkingStrategy values (fixed-size, recursive, sliding-window) to find the best chunking for your document types.
Connect the evaluation API to a CI pipeline so chunking and retrieval changes are validated automatically before merge.