A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
You’ll build an invoice processing pipeline that accepts PDF and image uploads, extracts structured data using Anthropic’s Claude, validates every result against a schema, retries on failure, and tracks per-invoice costs — all orchestrated through a Redis-backed job queue. By the end, you’ll have a working Next.js API with three endpoints, a background worker that processes up to four invoices concurrently, and a test suite that confirms every gate fires correctly.
Prerequisites
Node.js >= 22 (required by the engines field in package.json)
pnpm 10.0.0 (the project’s package manager; pnpm@10.0.0 as declared in packageManager)
Redis running locally on port 6379 (or a remote REDIS_URL you supply)
Anthropic API key with access to Claude 3 Haiku (ANTHROPIC_API_KEY)
Familiarity with TypeScript, Next.js App Router, and async job processing. You don’t need prior experience with Bull or the REAA agent libraries — they’re wired up step by step.
Step 1: Scaffold the project
Create a new directory and add the project configuration files. These pin every version and tell Next.js how to handle binary packages like sharp and ioredis.
Run pnpm install to pull everything down. The postinstall script from package.json runs a compatibility shim for the REAA packages.
terminal
pnpm install
Expected output: the terminal prints a progress bar for resolution and fetching, then the postinstall script completes silently. When finished, you’ll see a node_modules/ directory and a pnpm-lock.yaml file in your project root.
Step 3: Set environment variables
Copy the example file and edit it with your real values. Every variable here is read at runtime — none are optional.
terminal
cp .env.example .env
Open .env and replace the placeholder:
env
# Anthropic API key for Claude invoice extractionANTHROPIC_API_KEY=sk-ant-api03-your-key-here# Redis connection string for job queue and budget trackingREDIS_URL=redis://localhost:6379# Queue configurationQUEUE_NAME=invoice-processing# MCP request timeout in milliseconds (default: 30000)MCP_REQUEST_TIMEOUT_MS=30000# Enable or disable circuit breaker (default: true)ENABLE_CIRCUIT_BREAKER=true# MCP max retries for agent dispatch (default: 3)MCP_MAX_RETRIES=3
If your Redis is on a different host or port, update REDIS_URL accordingly. The ANTHROPIC_API_KEY must be a valid Anthropic key with access to claude-3-haiku-20240307.
Step 4: Create shared types and Zod schema
These two files define the data shapes that flow through every stage of the pipeline. The types are plain TypeScript interfaces; the schema is a Zod validator that gates check against.
Three files handle the heavy lifting: parsing PDFs, building the few-shot prompt with schema instructions, and calling Claude with cost and error management.
Create src/lib/prompt-builder.ts. This module assembles a system prompt, a JSON schema, three few-shot examples, and the document text — truncating at 180,000 characters if needed:
Create src/lib/invoke-claude.ts. This is the core extraction function — it estimates tokens, checks the budget, sends the prompt, parses the JSON response, and records actual spend:
ts
import { getAnthropicClient } from './anthropic-client.js';import type { Anthropic } from '@anthropic-ai/sdk';import { buildExtractionPrompt } from './prompt-builder.js';import { getCostTracker } from './cost-tracker.js';import type { ExtractedData } from '../types/index.js';export class ParseError extends Error { constructor(message: string) { super(message); this.name = 'ParseError'; }}export class RateLimitError extends Error { retryAfter: number; constructor(retryAfter: number) { super(`Rate limit exceeded. Retry after ${retryAfter}s`); this.name = 'RateLimitError'; this.retryAfter = retryAfter; }}export class AnthropicServerError extends Error { constructor(message: string) { super(`Anthropic server error: ${message}`); this.name = 'AnthropicServerError'; }}export class AnthropicClientError extends Error { constructor(message: string, status?: number) { super(`Anthropic client error (${status ?? 400}): ${message}`); this.name = 'AnthropicClientError'; }}const MODEL_ID = 'claude-3-haiku-20240307';export async function extractInvoice( documentText: string, scopeKey: string,): Promise<ExtractedData> { const prompt = buildExtractionPrompt(documentText); const estimatedInputTokens = Math.ceil(prompt.length / 4); const estimatedOutputTokens = 1024; const costTracker = getCostTracker(); await costTracker.checkBeforeCall(scopeKey, estimatedInputTokens, estimatedOutputTokens, MODEL_ID); const client = getAnthropicClient(); let response: Anthropic.Message; try { response = await client.messages.create({ model: MODEL_ID, max_tokens: 4096, temperature: 0, messages: [{ role: 'user', content: prompt }], }); } catch (err: unknown) { const errObj = err as Error & { status?: number; headers?: Record<string, string> }; if (errObj.status === 429) { const headers = errObj.headers ?? {}; const retryAfter = parseInt(headers['retry-after'] ?? headers['Retry-After'] ?? '30', 10); throw new RateLimitError(retryAfter); } if (errObj.status !== undefined && errObj.status >= 500) { throw new AnthropicServerError(errObj.message); } if (errObj.status !== undefined && errObj.status >= 400) { throw new AnthropicClientError(errObj.message, errObj.status); } throw err; } const content = response.content[0]; if (!content || content.type !== 'text') { throw new ParseError('Empty response from Anthropic'); } let parsed: unknown; try { parsed = JSON.parse(content.text); } catch { throw new ParseError(`Failed to parse response as JSON: ${content.text.slice(0, 200)}`); } const actualInputTokens = response.usage.input_tokens ?? estimatedInputTokens; const actualOutputTokens = response.usage.output_tokens ?? estimatedOutputTokens; await costTracker.recordAfterCall( scopeKey, response.id, actualInputTokens, actualOutputTokens, MODEL_ID, ); return parsed as ExtractedData;}
Note: invoke-claude.ts imports getCostTracker from ./cost-tracker.js. We’ll create that in the next step.
Step 7: Create cost tracking
The cost tracker wraps the agent-budget-engine with a Redis-backed spend store and Anthropic-specific pricing. Every API call runs a pre-flight budget check and records usage afterward.
import { BudgetController } from '@reaatech/agent-budget-engine';import { Redis } from 'ioredis';import { RedisSpendStore } from './spend-store.js';import { AnthropicPricingProvider } from './pricing.js';import { getRedis } from './redis.js';const BudgetScope = { User: 'user' } as const;export class CostTracker { private controller: BudgetController; constructor() { const redis: Redis = getRedis(); const spendStore = new RedisSpendStore(redis); const pricingProvider = new AnthropicPricingProvider(); this.controller = new BudgetController({ spendTracker: spendStore as never, pricing: pricingProvider as never, }); } async defineInvoiceBudget(scopeKey: string, limitUsd: number): Promise<void> { this.controller.defineBudget({ scopeType: BudgetScope.User as never, scopeKey, limit: limitUsd, policy: { softCap: 0.8, hardCap: 1.0, autoDowngrade: [], disableTools: [] } as never, }); } async checkBeforeCall( scopeKey: string, estimatedInputTokens: number, estimatedOutputTokens: number, modelId: string, ): Promise<void> { const pricingProvider = new AnthropicPricingProvider(); const estimatedCost = pricingProvider.estimate(modelId, estimatedInputTokens, estimatedOutputTokens); const result = this.controller.check({ scopeType: BudgetScope.User as never, scopeKey, estimatedCost, modelId, tools: [], } as never); const resultObj = result as never as { allowed: boolean }; if (!resultObj.allowed) { throw Object.assign( new Error(`Budget exceeded for scope ${scopeKey}`), { code: 'BudgetExceededError' }, ); } } async recordAfterCall( scopeKey: string, requestId: string, inputTokens: number, outputTokens: number, modelId: string, ): Promise<void> { const pricingProvider = new AnthropicPricingProvider(); const cost = pricingProvider.estimate(modelId, inputTokens, outputTokens); this.controller.record({ requestId, scopeType: BudgetScope.User as never, scopeKey, cost, inputTokens, outputTokens, modelId, provider: 'anthropic', timestamp: new Date(), } as never); } async getSpendForScope(scopeKey: string): Promise<{ spent: number; remaining: number; status: string }> { const state = this.controller.getState(BudgetScope.User as never, scopeKey); const stateObj = (state ?? {}) as never as { spent: number; remaining: number; state: string }; return { spent: stateObj.spent ?? 0, remaining: stateObj.remaining ?? 0, status: stateObj.state ?? 'Active', }; }}let costTrackerInstance: CostTracker | null = null;export function getCostTracker(): CostTracker { if (!costTrackerInstance) { costTrackerInstance = new CostTracker(); } return costTrackerInstance;}
Step 8: Create validation gates and retry logic
Three gates check every extraction: schema shape, required fields, and a minimum confidence score. If any gate fails, the runbook module retries the extraction with exponential backoff.
Create src/lib/validation.ts:
ts
import { createGateEngine, writeJUnitReport } from '@reaatech/agent-eval-harness-gate';import { InvoiceSchema } from './schema.js';import type { InvoiceValidationResult } from '../types/index.js';let engine: ReturnType<typeof createGateEngine> | null = null;function getEngine() { if (!engine) { engine = createGateEngine([ { name: 'schema-gate', type: 'custom' as never, description: 'Validates invoice data against the InvoiceSchema', customFn: (results
Create src/lib/runbook.ts:
ts
import { withRetry, TimeoutError, ValidationError } from '@reaatech/agent-handoff';import type { InvoiceValidationResult, ExtractedData } from '../types/index.js';export async function retryExtraction( extractFn: () => Promise<ExtractedData>, validatorFn: (data: unknown) => Promise<InvoiceValidationResult>, maxRetries = 3,): Promise<ExtractedData> { const wrappedFn = async (): Promise<ExtractedData> => { const result = await extractFn(); const validation = await validatorFn(result); if (!validation.valid) { const error = new ValidationError('Validation failed', validation.errors); (error as never as { details: string[] }).details = validation.errors; throw error; } return result; }; return withRetry<ExtractedData>(wrappedFn, { maxRetries, backoff: 'exponential', baseDelayMs: 1000, maxDelayMs: 16000, shouldRetry: (err: unknown) => { if (err instanceof ValidationError) return true; if (err instanceof TimeoutError) return true; return false; }, });}
Step 9: Create the job queue, processor, and worker
The Bull queue decouples file upload from processing. The job processor ties PDF parsing, Claude extraction, validation gates, and runbook retries into one function. The worker consumers run the processor concurrently.
Expected output: Vitest prints a test report with all 109 tests passing across several suites. You’ll see sections for the pipeline route (validates MIME types, file size, and error handling), cost tracker (budget enforcement and spend recording), validation (schema checks and confidence gates), queue behavior, and branch coverage tests for edge cases like encrypted PDFs and null text results. The last line reads something like Tests 109 passed (109) with a success count.
To check TypeScript types:
terminal
pnpm typecheck
This runs tsc --noEmit and should exit silently with no output, confirming zero type errors.
Next steps
Run pnpm dev in one terminal and pnpm start-worker in another to start processing real invoices. Upload a PDF with curl -F "file=@invoice.pdf" http://localhost:3000/api/pipeline, then poll the returned statusUrl.
Browse http://localhost:3000/api/dashboard/stats to see aggregated throughput and spend metrics across all processed invoices.
Swap the MODEL_ID in src/lib/invoke-claude.ts from claude-3-haiku-20240307 to claude-sonnet-4-20250514 for higher extraction accuracy at a higher per-call cost — the cost tracker picks up the new pricing automatically from src/lib/pricing.ts.
:
never
)
=>
{
const r = results as { metricBreakdown: Record<string, unknown> };
const payload = r.metricBreakdown;
const parsed = InvoiceSchema.safeParse(payload);
if (parsed.success) {
return { name: 'schema-gate' as never, passed: true as never, reason: 'Invoice data matches schema' as never };