A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This recipe builds an invoice extraction pipeline that turns PDF and image invoices into structured data and syncs them directly into Xero. You’ll use OpenAI’s GPT-5.2 vision model via the Responses API, validate output against Zod schemas, repair malformed results automatically, route high-confidence extractions to Xero and flag low-confidence ones for human review. The pipeline uses Langfuse observability, a spend-based budget controller, an S3 document archive, and a Next.js admin API. It runs on Node.js 22+ with Next.js 16 (App Router).
Prerequisites
Node.js >= 22
pnpm (v10+)
An OpenAI API key with access to gpt-5.2 (or equivalent vision-capable model)
A Xero developer account with an OAuth2 client_credentials (Custom Connection) app
An AWS S3 bucket with IAM credentials (access key + secret)
A Langfuse account (optional; for tracing and observability)
Familiarity with Next.js App Router, TypeScript, and basic shell usage
Step 1: Scaffold the project and configure dependencies
Create a new Next.js project with pinned dependency versions — no ^ or ~ ranges — since the @reaatech/* packages are vendored.
You also need standard Next.js config files. Create next.config.ts with the experimental.instrumentationHook flag (required for the Langfuse startup hook later):
Expected output: Running pnpm typecheck should complete without errors. The dependencies are all pinned to exact versions — confirm with grep -n '"[~^>]' package.json which should return nothing.
Step 2: Configure environment variables
Copy the environment template and fill in your credentials:
env
# Env vars used by openai-invoice-extraction-for-xero-smb-accounting.# Keep placeholders only — never commit real values.NODE_ENV=development# OpenAIOPENAI_API_KEY=<your-openai-key>OPENAI_MODEL=gpt-5.2# Xero (Custom Connections / client_credentials grant)XERO_CLIENT_ID=<your-xero-client-id>XERO_CLIENT_SECRET=<your-xero-client-secret># AWS S3AWS_REGION=us-east-1AWS_ACCESS_KEY_ID=<your-access-key>AWS_SECRET_ACCESS_KEY=<your-secret>S3_BUCKET_NAME=<your-bucket-name># Langfuse observabilityLANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>LANGFUSE_BASE_URL=<your-langfuse-base-url># Confidence routingCONFIDENCE_ROUTE_THRESHOLD=0.8CONFIDENCE_FALLBACK_THRESHOLD=0.3# Budget controlBUDGET_MONTHLY_LIMIT=50.0# EmbeddingsEMBEDDING_MODEL=text-embedding-3-smallEMBEDDING_BATCH_SIZE=100
Create a .env file from .env.example and fill in the real values for your keys.
Expected output:cat .env shows all variables with real values, no placeholders for credentials you plan to use.
Step 3: Define the invoice data types with Zod
The pipeline’s foundation is a set of Zod schemas that define what a valid invoice looks like. Every extraction result is validated against these schemas.
// src/services/ocr-service.tsimport { createWorker } from "tesseract.js";export class OcrError extends Error { constructor(message: string) { super(message); this.name = "OcrError"; }}export async function performOcr(buffer: Uint8Array): Promise<string> { let worker; try { worker = await createWorker("eng"); const imageBuffer = Buffer.from(buffer); const ret = await worker.recognize(imageBuffer); return ret.data.text; } catch (error) { if (error instanceof Error && error.message.includes("WASM")) { throw new OcrError(`WASM load failed: ${error.message}`); } return ""; } finally { if (worker) { await worker.terminate(); } }}
Expected output: Both files typecheck. parsePdf returns { text, totalPages } on success. performOcr returns an empty string on unreadable images and throws OcrError on WASM failures.
Step 5: Create the document loader with concurrency control
The document loader routes by MIME type: PDFs go to parsePdf, images go to performOcr. It uses p-limit with concurrency 3 so parallel uploads don’t overwhelm the system.
Expected output:loadDocument with a PDF source calls parsePdf and returns sourceType: "pdf". With an image source it calls performOcr. An unsupported MIME type throws UnsupportedSourceError.
Step 6: Build the schema builder and output repair
The schema-builder creates the prompt string that tells GPT-5.2 what fields to extract, and validates the response against the Zod schema.
ts
// src/extraction/schema-builder.tsimport { InvoiceDataSchema } from "../types/invoice.js";import type { InvoiceData } from "../types/invoice.js";import { ZodError } from "zod";export class ValidationError extends Error { public issues: { path: string; message: string }[]; constructor(message: string, issues: { path: string; message: string }[] = []) { super(message); this.name = "ValidationError"; this.issues = issues; }}export function buildInvoiceExtractionPrompt(rawText: string): string { return `You are an invoice data extraction assistant. Extract the following fields from the invoice text and return a JSON object matching this exact schema:{ "invoiceNumber": string (required), "supplierName": string (required), "supplierAddress": string (optional), "customerName": string (optional), "customerAddress": string (optional), "lineItems": [ { "description": string (required), "quantity": number (positive, required), "unitAmount": number (positive, required), "lineAmount": number (optional), "accountCode": string (optional), "taxType": string (optional), "taxAmount": number (optional) } ] (required, at least one item), "subtotal": number (optional), "tax": number (optional), "total": number (optional), "currencyCode": string (optional), "invoiceDate": string (YYYY-MM-DD, required), "dueDate": string (YYYY-MM-DD, required), "reference": string (optional), "bankAccount": string (optional)}Return ONLY the JSON object, no markdown formatting, no explanation.Invoice text:${rawText}`;}export function validateExtraction(raw: unknown): InvoiceData { try { return InvoiceDataSchema.parse(raw); } catch (err) { if (err instanceof ZodError) { const issues = err.issues.map((issue) => ({ path: issue.path.join("."), message: issue.message, })); throw new ValidationError( `Invoice data validation failed: ${issues.map((i) => `${i.path}: ${i.message}`).join("; ")}`, issues ); } throw new ValidationError( `Unexpected validation error: ${String(err)}` ); }}export function safeParseExtraction( raw: unknown): { data: InvoiceData | null; errors: string[] } { const result = InvoiceDataSchema.safeParse(raw); if (result.success) { return { data: result.data, errors: [] }; } const errors = result.error.issues.map( (issue) => `${issue.path.join(".")}: ${issue.message}` ); return { data: null, errors };}
The output-repair module is the safety net. When GPT returns malformed JSON — string numbers, whitespace-padded fields, missing optionals, or completely wrong shapes — it attempts automatic correction before falling back to regex extraction from the raw text.
ts
// src/extraction/output-repair.tsimport { safeParseExtraction } from "./schema-builder.js";import type { InvoiceData } from "../types/invoice.js";function trimStrings(obj: Record<string, unknown>): void { for (const key of Object.keys(obj)) { const val = obj[key]; if (typeof val === "string") { obj[key] = val.trim(); } else if (val !== null && typeof val ===
Expected output: A fully valid extraction passes through with warnings: []. A malformed one with "123.45" as a number string gets coerced. A completely empty input returns a fallback data with warnings describing every repair.
Step 7: Set up Langfuse observability and Next.js instrumentation
Observability helps you trace every API call, measure latency, and debug extraction failures.
ts
// src/lib/langfuse.tsimport { Langfuse, LangfuseTraceClient, LangfuseGenerationClient,} from "langfuse";let instance: Langfuse | null = null;export function initLangfuse(): Langfuse { if (!instance) { instance = new Langfuse({ publicKey: process.env.LANGFUSE_PUBLIC_KEY ?? "", secretKey: process.env.LANGFUSE_SECRET_KEY ?? "", baseUrl: process.env.LANGFUSE_BASE_URL ?? "", }); } return instance;}export function traceExtraction(name: string): LangfuseTraceClient { return initLangfuse().trace({ name });}export function observeGeneration( trace: LangfuseTraceClient, name: string): LangfuseGenerationClient { return trace.generation({ name });}
The instrumentation.ts file initializes Langfuse at server startup. Because register() runs in both Node and Edge runtimes, the dynamic import() protects the Node-only langfuse module from loading in Edge context.
Expected output: When the Next.js dev server starts, register() fires, the Langfuse singleton initializes, and the next.config.ts experimental flag enables the hook.
Step 8: Build the S3 storage and Xero integration
Documents need persistent storage. The S3Storage class wraps @aws-sdk/client-s3 with p-retry for transient failures:
ts
// src/integrations/s3-storage.tsimport { S3Client, PutObjectCommand, GetObjectCommand } from "@aws-sdk/client-s3";import pRetry from "p-retry";export class StorageError extends Error { constructor(message: string, cause?: unknown) { super(message, { cause }); this.name = "StorageError"; }}export class S3Storage { private client: S3Client; private bucketName: string; constructor() { const region = process.env.AWS_REGION; const bucketName = process.env.S3_BUCKET_NAME; const accessKeyId = process.env.AWS_ACCESS_KEY_ID; const secretAccessKey = process.env.AWS_SECRET_ACCESS_KEY; if (!region || !bucketName || !accessKeyId || !secretAccessKey) { throw new Error( "Missing required AWS environment variables: AWS_REGION, S3_BUCKET_NAME, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY" ); } this.bucketName = bucketName; this.client = new S3Client({ region, credentials: { accessKeyId, secretAccessKey, }, }); } async uploadDocument(key: string, body: Uint8Array, contentType: string): Promise<string> { return pRetry( async () => { try { await this.client.send( new PutObjectCommand({ Bucket: this.bucketName, Key: key, Body: body, ContentType: contentType, }) ); return key; } catch (err) { throw new StorageError( `Failed to upload document to s3://${this.bucketName}/${key}: ${String(err)}`, err ); } }, { retries: 3 } ); } async getDocument(key: string): Promise<Uint8Array> { return pRetry( async () => { try { const response = await this.client.send( new GetObjectCommand({ Bucket: this.bucketName, Key: key, }) ); const stream = response.Body; if (!stream) { throw new StorageError("Empty response body from S3"); } const chunks: Uint8Array[] = []; for await (const chunk of stream as AsyncIterable<Uint8Array>) { chunks.push(chunk); } const totalLength = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(totalLength); let offset = 0; for (const chunk of chunks) { result.set(chunk, offset); offset += chunk.length; } return result; } catch (err) { if (err instanceof StorageError) { throw err; } throw new StorageError( `Failed to get document from s3://${this.bucketName}/${key}: ${String(err)}`, err ); } }, { retries: 3 } ); }}
The XeroInvoiceIntegration maps extracted invoice data to Xero’s API shape and handles auth, validation errors, and rate limits:
ts
// src/integrations/xero.tsimport { XeroClient, Invoice } from "xero-node";import pRetry, { AbortError } from "p-retry";import type { InvoiceData } from "../types/invoice.js";import type { XeroInvoiceRequest } from "../types/xero.js";export class XeroAuthError extends Error { constructor(message: string, cause?: unknown) { super(message, { cause }); this.name = "XeroAuthError"; }}export class XeroValidationError extends Error { constructor(message: string
Expected output: Both files typecheck. S3Storage wraps uploads and downloads in 3-retry p-retry calls. XeroInvoiceIntegration handles expired tokens by re-authenticating once.
Step 9: Create the budget controller and confidence router
The budget controller uses @reaatech/agent-budget-engine to enforce a monthly spend cap:
ts
// src/budget/budget-controller.tsimport { BudgetController } from "@reaatech/agent-budget-engine";import { SpendStore } from "@reaatech/agent-budget-spend-tracker";import { BudgetScope } from "@reaatech/agent-budget-types";import type { BudgetCheckResult } from "@reaatech/agent-budget-types";export { SpendStore, BudgetScope };export type { BudgetCheckResult };export function createBudgetController(): BudgetController { const spendTracker = new SpendStore({ maxEntries: 10000 }); return new BudgetController({ spendTracker });}export function defineDefaultBudget(ctrl: BudgetController, limitDollars: number): void { ctrl.defineBudget({ scopeType: BudgetScope.Task, scopeKey: "default", limit: limitDollars, policy: { softCap: 0.8, hardCap: 1.0, autoDowngrade: [], disableTools: [], }, });}export function checkExtractionBudget(ctrl: BudgetController, estimatedCost: number): BudgetCheckResult { return ctrl.check({ scopeType: BudgetScope.Task, scopeKey: "default", estimatedCost, modelId: "gpt-5.2", tools: [], });}export function recordExtractionSpend( ctrl: BudgetController, entry: { cost: number; inputTokens: number; outputTokens: number }): void { ctrl.record({ requestId: crypto.randomUUID(), scopeType: BudgetScope.Task, scopeKey: "default", cost: entry.cost, inputTokens: entry.inputTokens, outputTokens: entry.outputTokens, modelId: "gpt-5.2", provider: "openai", timestamp: new Date(), });}
The confidence router decides whether an extraction is trustworthy enough to auto-post to Xero, or if it needs human review:
Expected output:createBudgetController() returns a BudgetController instance. createConfidenceRouter() reads thresholds from CONFIDENCE_ROUTE_THRESHOLD (default 0.8) and CONFIDENCE_FALLBACK_THRESHOLD (default 0.3). A completeness of 1.0 with zero warnings routes the extraction; a completeness of 0.3 with warnings triggers the review queue.
Step 10: Wire it all together — the extraction pipeline
The InvoiceExtractionPipeline orchestrates the full flow: budget check, document loading, OpenAI vision call (with multimodal input for images, text for PDFs), validation, repair, embedding, confidence scoring, budget recording, and S3 archival.
ts
// src/extraction/pipeline.tsimport OpenAI from "openai";import type { ResponseInput } from "openai/resources/responses/responses";import { EmbeddingService } from "@reaatech/hybrid-rag-embedding";import { BudgetController } from "@reaatech/agent-budget-engine";import { BudgetScope } from "@reaatech/agent-budget-types";import { ConfidenceRouter } from "@reaatech/confidence-router";import { S3Storage } from "../integrations/s3-storage.js";import { loadDocument } from "../services/document-loader.js";import { buildInvoiceExtractionPrompt } from "./schema-builder.js";import { repairExtraction } from "./output-repair.js";
Expected output:pnpm typecheck passes. The pipeline’s extract() method follows the full flow: budget check → document loading → OpenAI call (vision for images, text for PDFs) → JSON parse → output repair → budget recording → embedding → confidence routing → S3 upload → return ExtractionResult.
Step 11: Create the API routes
The POST /api/ingest route accepts multipart file uploads, validates size and MIME type, performs a budget check, then runs the pipeline.
The invoices route supports pagination and status filtering:
ts
// app/api/invoices/route.tsimport { type NextRequest, NextResponse } from "next/server";import { extractionStore, type ExtractionStatus } from "../extractions/store";export function GET(request: NextRequest) { const { searchParams } = new URL(request.url); const status = searchParams.get("status") as ExtractionStatus | null; const page = Math.max(1, parseInt(searchParams.get("page") ?? "1", 10) || 1); const limit = Math.min( 100, Math.max(1, parseInt(searchParams.get("limit") ?? "20", 10) || 20) ); const allItems = Array.from(extractionStore.values()); const filtered = status ? allItems.filter((item) => item.status === status) : allItems; const total = filtered.length; const start = (page - 1) * limit; const invoices = filtered.slice(start, start + limit); return NextResponse.json({ invoices, total, page, limit });}
Expected output: Start the dev server with pnpm dev. Upload a PDF invoice to POST /api/ingest with the file multipart field. The response includes an extractionId, status, confidence, and the structured invoice data. If requiresReview is true, open the review queue at GET /api/review.
Step 12: Write the golden evaluation comparator
The evaluation module uses @reaatech/agent-eval-harness-golden to compare extraction results against golden trajectories:
ts
// src/evaluation/golden-comparator.tsimport { quickCreateGolden, compareAgainstGolden } from "@reaatech/agent-eval-harness-golden";import type { GoldenTrajectory } from "@reaatech/agent-eval-harness-golden";import type { ExtractionResult } from "../types/extraction.js";import type { Trajectory } from "@reaatech/agent-eval-harness-types";function extractionResultToTrajectory(result: ExtractionResult): Trajectory { const trajectory: Trajectory = { turns: [ { turn_id: 1, role: "user", content: JSON.stringify(result.structured), timestamp: new Date().toISOString(), golden: true, expected: true, quality_notes: `confidence: ${String(result.confidence)}, status: ${result.status}, warnings: ${result.warnings.join(", ")}`, }, ], metadata: { start_time: new Date().toISOString(), end_time: new Date().toISOString(), total_turns: 1, total_cost: result.confidence, }, }; return trajectory;}export function createGoldenFromExtraction(result: ExtractionResult): GoldenTrajectory { const trajectory = extractionResultToTrajectory(result); return quickCreateGolden(trajectory, "invoice-extraction", ["invoice"]);}export function compareWithGolden( candidate: ExtractionResult, golden: GoldenTrajectory): ReturnType<typeof compareAgainstGolden> { const candidateTrajectory = extractionResultToTrajectory(candidate); return compareAgainstGolden(golden, candidateTrajectory, { similarityThreshold: 0.85, });}
Step 13: Write and run the tests
Tests mock every external dependency — MSW for OpenAI, vi.mock for local modules, and aws-sdk-client-mock for S3.
Finally, create the source index to re-export everything:
ts
// src/index.tsexport const SCAFFOLD_VERSION = "0.1.0" as const;export { InvoiceDataSchema, LineItemSchema } from "./types/invoice.js";export type { InvoiceData, InvoiceDataInput, LineItem } from "./types/invoice.js";export type { ExtractionResult, ExtractionMetadata, ExtractionStatus, DocumentSource,} from "./types/extraction.js";export type { XeroInvoiceRequest, XeroMappingResult } from "./types/xero.js";export { parsePdf, ParseError } from "./services/pdf-parser.js";export { performOcr, OcrError } from "./services/ocr-service.js";export { loadDocument, UnsupportedSourceError } from "./services/document-loader.js";export { initLangfuse, traceExtraction, observeGeneration } from "./lib/langfuse.js";export { InvoiceExtractionPipeline } from "./extraction/pipeline.js";export { XeroInvoiceIntegration, XeroAuthError, XeroValidationError, XeroRateLimitError } from "./integrations/xero.js";export { S3Storage, StorageError } from "./integrations/s3-storage.js";export { repairExtraction } from "./extraction/output-repair.js";export { createConfidenceRouter, decideExtraction } from "./classification/confidence-router.js";export { createBudgetController, defineDefaultBudget, checkExtractionBudget, recordExtractionSpend } from "./budget/budget-controller.js";export type { BudgetCheckResult } from "./budget/budget-controller.js";
Run the full test suite:
terminal
pnpm test
Expected output:numFailedTests: 0, numTotalTests >= 50, and all four coverage metrics (lines, branches, functions, statements) at 90% or above.
Next steps
Add multi-tenant support — scope the in-memory extraction store and Xero credentials per organization, using a middleware that reads a tenant header.
Replace the in-memory store — swap the Map<string, ExtractionResult> for a PostgreSQL-backed store so data survives server restarts and scales across instances.
Add a webhook notification system — when an extraction is approved or rejected, fire a webhook to the SMB’s own systems.
Train custom confidence routing — use @reaatech/agent-eval-harness-golden golden trajectories to tune the route threshold automatically based on historical approval rates.
Batch OCR fallback — add a local Tesseract worker pool to handle high-resolution image invoices that exceed OpenAI’s size limits.
"object"
&&
!
Array.
isArray
(val)) {
trimStrings(val as Record<string, unknown>);
}
}
}
function coerceNumbers(obj: Record<string, unknown>): void {
for (const key of Object.keys(obj)) {
const val = obj[key];
if (typeof val === "string") {
const num = Number(val);
if (!isNaN(num) && val.trim().length > 0) {
obj[key] = num;
}
} else if (val !== null && typeof val === "object" && !Array.isArray(val)) {
coerceNumbers(val as Record<string, unknown>);
}
}
}
function defaultMissingOptionals(obj: Record<string, unknown>): void {
if (obj.lineItems === undefined) {
obj.lineItems = [];
}
if (obj.supplierAddress === undefined) {
obj.supplierAddress = "";
}
if (obj.customerName === undefined) {
obj.customerName = "";
}
if (obj.customerAddress === undefined) {
obj.customerAddress = "";
}
if (obj.subtotal === undefined) {
obj.subtotal = 0;
}
if (obj.tax === undefined) {
obj.tax = 0;
}
if (obj.total === undefined) {
obj.total = 0;
}
if (obj.currencyCode === undefined) {
obj.currencyCode = "";
}
if (obj.reference === undefined) {
obj.reference = "";
}
if (obj.bankAccount === undefined) {
obj.bankAccount = "";
}
}
function regexExtractValue(text: string, pattern: RegExp): string | null {