Skip to content
/ solutions / cohere-document-pipeline-for-bigcommerce-smb-order-processing Cohere Document Pipeline for BigCommerce SMB Order Processing Automatically scan and process emailed purchase orders and quote requests into BigCommerce, cutting order entry time from minutes to seconds.
The problem Small e-commerce merchants on BigCommerce waste hours manually transcribing orders from emailed PDFs and scan attachments into the platform, leading to data entry errors and delayed fulfillment.
Example artifact A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
200 kB · 113 tests· 95.9% coverage· vitest passing
SHA-256 35860560e2d02acb6ea33e3a8481543ef748418d65f226607698f30e68a02624 Comments Sign in to commentSign in with GitHub to comment and vote.
Las Vegas, Nevada USA © 2026 REAA Technologies Inc. — Open-Source AI Solutions for Small Business.
On this page Intro
This recipe builds a document-to-order pipeline for small e-commerce merchants on BigCommerce. You’ll create a Fastify server that accepts uploaded PDFs, DOCX files, and scanned images, extracts their text, classifies the content into structured order data using Cohere’s Command A model, repairs any malformed JSON output, and pushes the order directly into BigCommerce via its REST API. A Next.js dashboard lets you monitor order counts, LLM costs, and recent errors.
Prerequisites
Node.js 22+ and pnpm 10+ installed
A Cohere API key (CO_API_KEY) with access to the command-a-03-2025 model
An OpenAI API key (OPENAI_API_KEY) for the embedding-based LLM cache
A BigCommerce store with API credentials (store hash, username, token) that has Orders API access
Langfuse account credentials (optional — the service degrades to no-ops when keys are absent)
Familiarity with TypeScript, Next.js App Router, and Fastify
Step 1: Scaffold the project and set up environment
Start with a fresh Next.js project using the App Router, install the exact-pinned dependencies, and configure your environment variables.
npx create-next-app@latest cohere-document-pipeline \
--typescript --app --src-dir --import-alias "@/*"
--use-pnpm
cd cohere-document-pipeline
Add the runtime dependencies:
pnpm add fastify@5.8.5 @fastify/multipart@10.0.0
pnpm add cohere-ai@8.0.0 pdfjs-dist@6.0.227 mammoth@1.12.0
pnpm add sharp@0.35.2 tesseract.js@7.0.0 file-type@22.0.1
pnpm add p-retry@8.0.0 zod@4.4.3 langfuse@3.38.20
pnpm add @reaatech/llm-cache@0.1.0 @reaatech/llm-cost-telemetry@0.2.0
pnpm add @reaatech/media-pipeline-mcp-doc-extraction@0.3.0
pnpm add @reaatech/structured-repair-core@1.0.0
pnpm add -D tsx@4.22.4 msw@2.14.6 Next, create your .env file. Every variable used by the system is listed in .env.example:
NODE_ENV=development
CO_API_KEY=<your-cohere-key>
OPENAI_API_KEY=<your-openai-key>
BIGCOMMERCE_STORE_HASH=<your-store-hash>
BIGCOMMERCE_API_USERNAME=<your-api-username>
BIGCOMMERCE_API_TOKEN=<your-api-token>
LANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>
LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>
LANGFUSE_HOST=<your-langfuse-host>
PORT=3001 Add the Fastify dev script to package.json:
"dev:fastify" : "tsx watch src/server.ts" Finally, set the Next.js instrumentation hook in next.config.ts so the server startup hooks run:
const nextConfig = {
experimental: { instrumentationHook: true },
};
export default nextConfig; Expected output: pnpm install succeeds, pnpm typecheck passes without errors.
Step 2: Define the Zod schemas Create src/schemas/address.ts — the foundational address schema used by customer info:
import { z } from "zod" ;
export const AddressSchema = z. object ({
line1: z. string (),
line2: z. string (). optional (),
city: z. string (),
state: z. string (),
postalCode: z. string (),
country: z. string (),
});
export type Address = z . infer < typeof AddressSchema>; Create src/schemas/order.ts — the full order schema combining line items, customer info, totals, and metadata:
import { z } from "zod" ;
import { AddressSchema } from "./address.js" ;
export const LineItemSchema = z. object ({
sku: z. string (),
name: z. string (),
quantity: z. number (). int (). positive (),
unitPrice: z. number (). nonnegative (),
totalPrice: z. number (). nonnegative (),
});
export const CustomerInfoSchema = z. object ({
name: z. string (),
email: z. email (),
phone: z. string (). optional (),
shippingAddress: AddressSchema,
billingAddress: AddressSchema. optional (),
});
export const OrderTotalsSchema = z. object ({
subtotal: z. number (). nonnegative (),
tax: z. number (). nonnegative (),
shipping: z. number (). nonnegative (). optional (),
total: z. number (). nonnegative (),
currency: z. string (). length ( 3 ),
});
export const OrderMetadataSchema = z. object ({
sourceDocument: z. string (),
ingestionTimestamp: z.iso. datetime (),
confidence: z. number (). min ( 0 ). max ( 1 ),
llmCostUsd: z. number (). nonnegative (). optional (),
processedByCache: z. boolean (). optional (),
});
export const OrderSchema = z. object ({
orderId: z. string (). optional (),
items: z. array (LineItemSchema),
customer: CustomerInfoSchema,
totals: OrderTotalsSchema,
metadata: OrderMetadataSchema,
});
export type Order = z . infer < typeof OrderSchema>;
export type LineItem = z . infer < typeof LineItemSchema>;
export type CustomerInfo = z . infer < typeof CustomerInfoSchema>;
export type OrderTotals = z . infer < typeof OrderTotalsSchema>;
export type OrderMetadata = z . infer < typeof OrderMetadataSchema>; Create src/schemas/ingest.ts — the API response and extracted-document schemas:
import { z } from "zod" ;
import { OrderSchema } from "./order.js" ;
export const IngestResponseSchema = z. object ({
success: z. boolean (),
orderId: z. string (). optional (),
order: OrderSchema. optional (),
processedByCache: z. boolean (),
costUsd: z. number (). nonnegative (),
ingestionId: z. string (),
error: z. string (). optional (),
code: z. string (). optional (),
});
export type IngestResponse = z . infer < typeof IngestResponseSchema>;
export const IngestErrorSchema = z. object ({
success: z. literal ( false ),
error: z. string (),
code: z. string (),
ingestionId: z. string (),
});
export type IngestError = z . infer < typeof IngestErrorSchema>;
export const ExtractedDocumentSchema = z. object ({
text: z. string (),
mimeType: z. string (),
confidence: z. number (). min ( 0 ). max ( 1 ). optional (),
pageCount: z. number (). int (). positive (). optional (),
metadata: z. record (z. string (), z. unknown ()). optional (),
});
export type ExtractedDocument = z . infer < typeof ExtractedDocumentSchema>; Expected output: pnpm typecheck passes. These schemas are the contract for every module downstream.
Step 3: Build the BigCommerce client Create src/lib/bigcommerce.ts — a retry-enabled HTTP client that talks to the BigCommerce REST API with typed error classes:
import pRetry, { AbortError } from "p-retry" ;
export interface ProductInfo {
id : number ;
sku : string ;
name : string ;
price : number ;
inventoryLevel : number ;
}
export interface OrderInput {
items : Array <{
sku : string ;
name : string ;
quantity : number ;
unitPrice : number ;
totalPrice
Expected output: pnpm typecheck passes. The client wraps every API call with pRetry, aborts on unrecoverable errors (401, 404), and retries on rate-limits (429) and server errors (500+).
Step 4: Create the LLM cache wrapper Create src/lib/cache.ts — a thin wrapper around @reaatech/llm-cache that stores and retrieves classified order data using OpenAI embeddings:
import { CacheEngine, InMemoryAdapter, OpenAIEmbedder } from "@reaatech/llm-cache" ;
import type { Order } from "../schemas/order.js" ;
export function createCacheService () : CacheEngine {
return new CacheEngine ({
storage: new InMemoryAdapter (),
vectorStorage: new InMemoryAdapter (),
embedder: new OpenAIEmbedder ({
provider: "openai" ,
model: "text-embedding-3-small" ,
dimensions: 1536 ,
apiKey: process.env.OPENAI_API_KEY ?? "" ,
}),
config: {
storage: { adapter: "memory" },
vectorStorage: { adapter: "memory" },
embedding: {
provider: "openai" ,
model: "text-embedding-3-small" ,
dimensions: 1536 ,
batchSize: 100 ,
maxRetries: 3 ,
},
similarity: { threshold: 0.8 , metric: "cosine" , maxResults: 10 },
ttl: {
default: 3600 ,
factual: 1800 ,
creative: 7200 ,
analytical: 3600 ,
sensitive: 600 ,
byUseCase: {},
},
segmentation: { enabled: true , defaultUseCase: "order-classification" },
cost: { enabled: true , currency: "USD" },
observability: { metrics: true , tracing: false , logging: "info" },
},
});
}
export async function getCachedOrder (
cache : CacheEngine ,
prompt : string ,
) : Promise < Order | null > {
const result = await cache. get (prompt, {
model: "command-a-03-2025" ,
useCase: "order-classification" ,
});
if (result.hit) {
return result.entry.response as Order ;
}
return null ;
}
export async function setCachedOrder (
cache : CacheEngine ,
prompt : string ,
order : Order ,
) : Promise < void > {
await cache. set (
prompt,
order,
{ model: "command-a-03-2025" , useCase: "order-classification" },
{ queryType: "factual" , ttl: 1800 },
);
}
export async function invalidateCache (
cache : CacheEngine ,
promptHash ?: string ,
) : Promise < void > {
await cache. invalidate ({
useCase: "order-classification" ,
... (promptHash ? { promptHash } : {}),
});
} Expected output: pnpm typecheck passes. The cache uses cosine-similarity with a 0.8 threshold — similar purchase-order text returns cached results without calling Cohere again. The invalidateCache function lets you purge stale entries.
Step 5: Build the JSON repair wrapper Create src/lib/repair.ts — Cohere sometimes returns JSON with markdown fences, trailing commas, or string-typed numbers. @reaatech/structured-repair-core handles all of it:
import { repair, repairOutput, isValid, analyzeInput, UnrepairableError } from "@reaatech/structured-repair-core" ;
import { OrderSchema, type Order } from "../schemas/order.js" ;
export class OrderRepairError extends Error {
constructor (
message : string ,
public readonly fieldErrors ?: Array <{ path : string ; message : string }>,
public readonly partialData ?: unknown ,
) {
super(message);
this.name = "OrderRepairError" ;
}
}
export async function repairOrderJson (rawJson : string ) : Promise < Order > {
try {
return await repair (OrderSchema, rawJson);
} catch (err) {
if (err instanceof UnrepairableError ) {
throw new OrderRepairError (
`Unrepairable JSON: ${ err . message }` ,
);
}
throw err;
}
}
export function repairOrderJsonWithDiagnostics (rawJson : string ) {
return repairOutput ({
schema: OrderSchema,
input: rawJson,
debug: true ,
});
}
export function isValidOrderJson (json : string ) : boolean {
return isValid (OrderSchema, json);
}
export function analyzeOrderInput (rawJson : string ) {
return analyzeInput (rawJson);
} Expected output: pnpm typecheck passes. The repairOrderJson function is the primary entrypoint; repairOrderJsonWithDiagnostics is the debugging version used when recovery fails.
Step 6: Create the cost telemetry service Create src/lib/telemetry.ts — tracks every LLM call cost and checks daily budgets using @reaatech/llm-cost-telemetry:
import {
generateId,
now,
calculateCostFromTokens,
loadConfig,
CostSpanSchema,
getWindowStart,
type CostSpan,
} from "@reaatech/llm-cost-telemetry" ;
const PRICE_PER_MILLION = 30 ;
export class CostTelemetryService {
private spans : CostSpan [] = [];
private config : ReturnType < typeof loadConfig>;
constructor () {
this.config = loadConfig ();
}
recordCall (
provider : string ,
model : string ,
inputTokens : number ,
outputTokens : number ,
feature : string ,
) : CostSpan {
const totalTokens = inputTokens + outputTokens;
const costUsd = calculateCostFromTokens (totalTokens, PRICE_PER_MILLION);
const span = {
id: generateId (),
provider,
model,
inputTokens,
outputTokens,
costUsd,
feature,
timestamp: now (),
};
const validated = CostSpanSchema. parse (span);
this.spans. push (validated);
return validated;
}
getDailyTotal () : number {
const dayStart = getWindowStart ( new Date (), "day" );
return this.spans
. filter ((s) : s is CostSpan & { timestamp : Date } => s.timestamp !== undefined )
. filter ((s) => s.timestamp >= dayStart)
. reduce ((sum, s) => sum + s.costUsd, 0 );
}
checkBudget () : { withinBudget : boolean ; dailyTotal : number ; dailyLimit : number } {
const dailyTotal = this. getDailyTotal ();
const dailyLimit = this.config.budget.global?.daily ?? 100 ;
return {
withinBudget: dailyTotal <= dailyLimit,
dailyTotal,
dailyLimit,
};
}
} Expected output: pnpm typecheck passes. The PRICE_PER_MILLION constant represents $30 per million tokens — adjust it to match your Cohere billing rate.
Step 7: Set up Langfuse observability Create src/lib/langfuse.ts — a factory that returns a real Langfuse client when credentials are present, or no-op stubs when they’re not:
import Langfuse from "langfuse" ;
export interface LangfuseService {
trace : (name : string , metadata ?: Record < string , unknown >) => void ;
span : (traceId : string , name : string , input ?: Record < string , unknown >) => void ;
flushAsync : () => Promise < void >;
traceIngestion : (
ingestionId : string ,
metadata : {
documentType : string ;
fileSize : number ;
processingTimeMs : number ;
success : boolean ;
},
) => void ;
spanClassify : (
traceId : string ,
params : {
model : string ;
inputTokens : number ;
outputTokens : number ;
costUsd : number ;
},
) => void ;
}
function createNoopService () : LangfuseService {
return {
trace : () => {},
span : () => {},
flushAsync : async () => {},
traceIngestion : () => {},
spanClassify : () => {},
};
}
export function createLangfuseService () : LangfuseService {
const publicKey = process.env.LANGFUSE_PUBLIC_KEY;
const secretKey = process.env.LANGFUSE_SECRET_KEY;
const host = process.env.LANGFUSE_HOST;
if ( ! publicKey || ! secretKey) {
return createNoopService ();
}
const client = new Langfuse ({
publicKey,
secretKey,
... (host ? { host } : {}),
});
return {
trace : (name : string , metadata ?: Record < string , unknown >) => {
client. trace ({ name, ... (metadata ? { metadata } : {}) });
},
span : (traceId : string , name : string , input ?: Record < string , unknown >) => {
client. span ({ traceId, name, ... (input ? { input } : {}) });
},
flushAsync : () => client. flushAsync (),
traceIngestion : (
ingestionId : string ,
metadata : {
documentType : string ;
fileSize : number ;
processingTimeMs : number ;
success : boolean ;
},
) => {
client. trace ({ id: ingestionId, name: "document-ingestion" , metadata });
},
spanClassify : (
traceId : string ,
params : {
model : string ;
inputTokens : number ;
outputTokens : number ;
costUsd : number ;
},
) => {
client. span ({ traceId, name: "llm-classify" , input: params });
},
};
} Expected output: pnpm typecheck passes. When Langfuse env vars are absent, all methods are no-ops — no crashes, no error logs.
Step 8: Implement file detection Create src/services/file-detector.ts — uses file-type to identify uploaded documents and map them to internal categories:
import { fileTypeFromBuffer } from "file-type" ;
export interface DetectedFileType {
ext : string ;
mime : string ;
category : "pdf" | "docx" | "image" | "unknown" ;
}
export async function detectDocumentType (buffer : Buffer ) : Promise < DetectedFileType > {
try {
const result = await fileTypeFromBuffer (buffer);
if ( ! result) {
return { ext: "unknown" , mime: "application/octet-stream" , category: "unknown" };
}
const mimeToCategory : Record < string , "pdf" | "docx" | "image" > = {
"application/pdf" : "pdf" ,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document" : "docx" ,
"image/png" : "image" ,
"image/jpeg" : "image" ,
"image/tiff" : "image" ,
"image/webp" : "image" ,
};
const category = mimeToCategory[result.mime] ?? "unknown" ;
return {
ext: result.ext,
mime: result.mime,
category,
};
} catch {
return { ext: "unknown" , mime: "application/octet-stream" , category: "unknown" };
}
} Expected output: pnpm typecheck passes. Unknown types are caught early in the pipeline before any extraction or LLM call.
Create src/services/document-extractor.ts — extracts text from PDFs using pdfjs-dist and from DOCX files using mammoth:
import * as pdfjsLib from "pdfjs-dist" ;
import mammoth from "mammoth" ;
import {
createDocumentExtractionOperations,
} from "@reaatech/media-pipeline-mcp-doc-extraction" ;
import { ExtractedDocumentSchema, type ExtractedDocument } from "../schemas/ingest.js" ;
export class DocumentExtractionError extends Error {
constructor (
message : string ,
public code ?: string ,
) {
super(message);
this.name = "DocumentExtractionError" ;
}
}
export
Expected output: pnpm typecheck passes. Image MIME types throw with USE_OCR — the orchestrator routes those to the OCR processor instead. The extractFieldsFromDocument function provides structured field extraction via the REAA media pipeline.
Step 10: Build the OCR processor for scanned images Create src/services/ocr-processor.ts — preprocesses images with Sharp (greyscale, normalize, sharpen) then runs Tesseract.js OCR:
import { fileTypeFromBuffer } from "file-type" ;
import { createWorker } from "tesseract.js" ;
import sharp from "sharp" ;
import { ExtractedDocumentSchema, type ExtractedDocument } from "../schemas/ingest.js" ;
class OcrProcessingError extends Error {
constructor (message : string ) {
super(message);
this.name = "OcrProcessingError" ;
}
}
export async function processImage (buffer : Buffer ) : Promise < ExtractedDocument > {
const fileType = await fileTypeFromBuffer (buffer);
if ( ! fileType || ! fileType.mime. startsWith ( "image/" )) {
throw new OcrProcessingError ( "Unsupported image type" );
}
let preprocessed : Buffer ;
try {
preprocessed = await sharp (buffer). greyscale (). normalize (). sharpen (). toBuffer ();
} catch {
preprocessed = buffer;
}
const worker = await createWorker ( "eng" );
try {
const { data } = await worker. recognize (preprocessed);
const confidence = data.confidence / 100 ;
return ExtractedDocumentSchema. parse ({
text: data.text,
mimeType: fileType.mime,
confidence,
});
} finally {
await worker. terminate ();
}
} Expected output: pnpm typecheck passes. The worker is always terminated in the finally block to prevent resource leaks.
Step 11: Create the order classifier with Cohere Create src/services/order-classifier.ts — the heart of the pipeline. It checks the cache first, calls Cohere’s Command A on a cache miss, repairs the output, and stores the result:
import { CohereClientV2, CohereError } from "cohere-ai" ;
import { getCachedOrder, setCachedOrder } from "../lib/cache.js" ;
import { repairOrderJson, repairOrderJsonWithDiagnostics } from "../lib/repair.js" ;
import type { Order } from "../schemas/order.js" ;
import type { CacheEngine } from "@reaatech/llm-cache" ;
import type { CostTelemetryService } from "../lib/telemetry.js" ;
export class OrderClassificationError extends Error {
constructor (
message : string ,
public readonly originalError ?: unknown ,
) {
super(message);
Expected output: pnpm typecheck passes. The cache check at the top of the function means repeated uploads with near-identical text skip the LLM call entirely.
Step 12: Build the ingestion orchestrator Create src/services/ingestion-orchestrator.ts — this ties every service together into a single processDocument method:
import { generateId } from "@reaatech/llm-cost-telemetry" ;
import { type BigCommerceClient, BigCommerceServerError, BigCommerceAuthError } from "../lib/bigcommerce.js" ;
import type { CacheEngine } from "@reaatech/llm-cache" ;
import type { LangfuseService } from "../lib/langfuse.js" ;
import type { CostTelemetryService } from "../lib/telemetry.js" ;
import { type DetectedFileType, detectDocumentType } from "./file-detector.js" ;
import { extractDocument } from "./document-extractor.js" ;
import type { ExtractedDocument } from "../schemas/ingest.js" ;
import { processImage } from "./ocr-processor.js" ;
import
Expected output: pnpm typecheck passes. The orchestrator is the single entry point that the Fastify server calls — it handles success, failure at any stage, and partial failure (LLM succeeded but BigCommerce failed). Every outcome is traced through Langfuse.
Step 13: Wire up the Fastify server Create src/server.ts — registers multipart uploads, instantiates all services, and exposes three routes:
import Fastify from "fastify" ;
import fastifyMultipart from "@fastify/multipart" ;
import { BigCommerceClient } from "./lib/bigcommerce.js" ;
import { createCacheService } from "./lib/cache.js" ;
import { createLangfuseService } from "./lib/langfuse.js" ;
import { CostTelemetryService } from "./lib/telemetry.js" ;
import { IngestionOrchestrator } from "./services/ingestion-orchestrator.js" ;
export async function createApp () {
const app = Fastify ({ logger: true });
await app. register (fastifyMultipart, { limits: { fileSize: 10_485_760 } });
const cache = createCacheService ();
const langfuseService = createLangfuseService ();
const telemetry = new CostTelemetryService ();
const bigCommerce = new BigCommerceClient (
process.env.BIGCOMMERCE_STORE_HASH ?? "" ,
process.env.BIGCOMMERCE_API_USERNAME ?? "" ,
process.env.BIGCOMMERCE_API_TOKEN ?? "" ,
);
const orchestrator = new IngestionOrchestrator (
bigCommerce,
langfuseService,
telemetry,
cache,
);
app. post ( "/api/ingest" , async (request, reply) => {
const data = await request. file ();
if ( ! data) {
return reply. status ( 400 ). send ({ error: "No file attached" });
}
const buffer = await data. toBuffer ();
if (buffer.length === 0 ) {
return reply. status ( 400 ). send ({ error: "Empty file" });
}
const result = await orchestrator. processDocument (buffer, data.filename);
const status = result.success ? 200 : 400 ;
return reply. status (status). send (result);
});
app. get ( "/api/health" , () => {
return { status: "ok" , uptime: process. uptime () };
});
app. get ( "/api/metrics/cost" , () => {
const budget = telemetry. checkBudget ();
return {
dailyTotal: budget.dailyTotal,
budgetStatus: budget.withinBudget ? "within" : "exceeded" ,
};
});
return { app, langfuseService };
}
async function startApp () {
const { app, langfuseService } = await createApp ();
const port = Number (process.env.PORT) || 3001 ;
await app. listen ({ port, host: "0.0.0.0" });
const handleSigterm = async () => {
await langfuseService. flushAsync ();
await app. close ();
};
process. on ( "SIGTERM" , () => {
void handleSigterm ();
});
}
export { startApp }; Expected output: Running pnpm dev:fastify starts the server on port 3001 with logger: true. Upload a PDF with curl -F "file=@order.pdf" http://localhost:3001/api/ingest and you’ll get back a structured IngestResponse.
Step 14: Create the Next.js dashboard and API routes Replace the generated app/page.tsx with a live dashboard that reads from an in-memory order store, and create two Next.js API routes that expose the same data to the frontend.
First, create the shared in-memory store at src/lib/order-store.ts:
export interface OrderSummary {
id : string ;
status : "success" | "error" ;
filename : string ;
costUsd : number ;
processedByCache : boolean ;
error ?: string ;
timestamp : Date ;
}
const orders : OrderSummary [] = [];
export function addOrder (order : OrderSummary ) : void {
orders. push (order);
}
export function getAllOrders () : OrderSummary [] {
return [ ... orders];
}
export function getOrderById (id : string ) : OrderSummary | undefined {
return orders. find ((o) => o.id === id);
}
export function resetStore () : void {
orders.length = 0 ;
}
export function getStats () : { totalOrders : number ; totalCostUsd : number ; totalErrors : number ; recentOrders : OrderSummary [] } {
const totalOrders = orders.length;
const totalCostUsd = orders. reduce ((sum, o) => sum + (o.status === "success" ? o.costUsd : 0 ), 0 );
const totalErrors = orders. filter ((o) => o.status === "error" ).length;
const recentOrders = [ ... orders]. reverse (). slice ( 0 , 10 );
return { totalOrders, totalCostUsd, totalErrors, recentOrders };
} Create app/api/orders/route.ts — a Next.js App Router route handler:
import { NextResponse } from "next/server" ;
import { getStats } from "../../../src/lib/order-store.js" ;
export function GET () : Response {
const stats = getStats ();
return NextResponse. json ({ orders: stats.recentOrders, stats });
} Create app/api/orders/[id]/route.ts — individual order lookup using the Next.js async params pattern:
import { type NextRequest, NextResponse } from "next/server" ;
import { getOrderById } from "../../../../src/lib/order-store.js" ;
export async function GET (
_req : NextRequest ,
{ params } : { params : Promise <{ id : string }> },
) : Promise < Response > {
const { id } = await params;
const order = getOrderById (id) ?? null ;
return NextResponse. json ({ order, id });
} Finally, replace app/page.tsx with a client-side dashboard that fetches order stats and Fastify cost metrics:
"use client" ;
import { useEffect, useState } from "react" ;
interface Stats {
totalOrders : number ;
totalCostUsd : number ;
totalErrors : number ;
recentOrders : Array <{
id : string ;
status : string ;
filename : string ;
costUsd : number ;
error ?: string ;
timestamp : string ;
}>;
}
export default
Expected output: Running pnpm dev starts the Next.js app on port 3000. Navigate to http://localhost:3000 to see the dashboard with order statistics and a recent orders table.
Step 15: Create the instrumentation file Create src/instrumentation.ts — Next.js calls register() at startup when experimental.instrumentationHook is enabled in next.config.ts:
export async function register () : Promise < void > {
if (process.env.NEXT_RUNTIME === "nodejs" ) {
const { loadConfig } = await import ( "@reaatech/llm-cost-telemetry" );
loadConfig ();
const { createCacheService } = await import ( "./lib/cache.js" );
createCacheService ();
const { createLangfuseService } = await import ( "./lib/langfuse.js" );
createLangfuseService ();
}
} Expected output: The register() function warms the cache and initializes Langfuse on server start. The NEXT_RUNTIME guard prevents execution in the Edge runtime.
Step 16: Run the tests The project includes a test suite covering every module. Run all tests with the script defined in package.json:
Tests: 18 files | 60+ tests passed
Coverage: lines/branches/functions/statements all >= 90% The test suite covers happy paths (PDF ingestion returns a parsed order), error paths (no file attached returns 400, unknown file type returns UNSUPPORTED_FORMAT), and boundary cases (empty buffer, cache hit that skips the LLM). Services that make network calls use MSW (BigCommerceClient) or vi.mock (Cohere, Tesseract, file-type) so tests are fast and deterministic.
Next steps
Add a webhook callback — trigger a Slack or email notification when an order is successfully created in BigCommerce.
Extend the dashboard — add a chart of LLM cost over time using the daily totals from @reaatech/llm-cost-telemetry.
Integrate product lookup — pair getProductBySku with the classifier to validate SKUs and auto-fill prices before pushing to BigCommerce.
Add support for more document formats — extend file-detector.ts to handle CSV or XLSX by adding new extractors.
:
number
;
}>;
customer : {
name : string ;
email : string ;
phone ?: string ;
shippingAddress : {
line1 : string ;
line2 ?: string ;
city : string ;
state : string ;
postalCode : string ;
country : string ;
};
billingAddress ?: {
line1 : string ;
line2 ?: string ;
city : string ;
state : string ;
postalCode : string ;
country : string ;
};
};
totals : {
subtotal : number ;
tax : number ;
shipping ?: number ;
total : number ;
currency : string ;
};
}
export interface BigCommerceOrderResponse {
id : number ;
[key : string ] : unknown ;
}
export class BigCommerceError extends Error {
constructor (
message : string ,
public statusCode : number ,
public body ?: unknown ,
) {
super(message);
this.name = "BigCommerceError" ;
}
}
export class BigCommerceAuthError extends BigCommerceError {
constructor (body ?: unknown ) {
super( "Authentication failed" , 401 , body);
this.name = "BigCommerceAuthError" ;
}
}
export class BigCommerceRateLimitError extends BigCommerceError {
constructor (body ?: unknown ) {
super( "Rate limited" , 429 , body);
this.name = "BigCommerceRateLimitError" ;
}
}
export class BigCommerceServerError extends BigCommerceError {
constructor (statusCode : number , body ?: unknown ) {
super( "BigCommerce server error" , statusCode, body);
this.name = "BigCommerceServerError" ;
}
}
export class BigCommerceClient {
private baseUrl : string ;
private authHeader : string ;
constructor (
private storeHash : string ,
apiUsername : string ,
apiToken : string ,
) {
this.baseUrl = `https://api.bigcommerce.com/stores/${ storeHash }/v3` ;
this.authHeader = `Basic ${ Buffer . from ( `${ apiUsername }:${ apiToken }` ). toString ( "base64" ) }` ;
}
async createOrder (order : OrderInput ) : Promise < BigCommerceOrderResponse > {
return pRetry (
async () => {
const response = await fetch ( `${ this . baseUrl }/orders` , {
method: "POST" ,
headers: {
Authorization: this.authHeader,
Accept: "application/json" ,
"Content-Type" : "application/json" ,
},
body: JSON. stringify (order),
});
if (response.status === 401 ) {
throw new AbortError ( new BigCommerceAuthError ());
}
if (response.status === 429 ) {
throw new BigCommerceRateLimitError ();
}
if (response.status >= 500 ) {
throw new BigCommerceServerError (response.status);
}
if (response.status === 404 ) {
throw new AbortError ( new BigCommerceError ( "Not found" , 404 ));
}
if ( ! response.ok) {
throw new BigCommerceError (
`Unexpected status: ${ String ( response . status ) }` ,
response.status,
);
}
return response. json () as Promise < BigCommerceOrderResponse >;
},
{ retries: 3 , onFailedAttempt: console.warn },
);
}
async getProductBySku (sku : string ) : Promise < ProductInfo | null > {
return pRetry (
async () => {
const url = `${ this . baseUrl }/catalog/products?sku=${ encodeURIComponent ( sku ) }` ;
const response = await fetch (url, {
headers: {
Authorization: this.authHeader,
Accept: "application/json" ,
},
});
if (response.status === 404 ) return null ;
if ( ! response.ok) {
throw new BigCommerceError (
`Failed to fetch product: ${ String ( response . status ) }` ,
response.status,
);
}
const data = ( await response. json ()) as { data : ProductInfo [] };
return data.data[ 0 ] ?? null ;
},
{ retries: 2 , onFailedAttempt : () => {} },
);
}
}
async
function
extractTextFromPdf
(buffer
:
Buffer
)
:
Promise
<
string
> {
try {
const doc = await pdfjsLib. getDocument ({ data: buffer }).promise;
const pages : string [] = [];
for ( let i = 1 ; i <= doc.numPages; i ++ ) {
const page = await doc. getPage (i);
const textContent = await page. getTextContent ();
const pageText = textContent.items
. map ((item) => {
if ( "str" in item) return item.str;
return "" ;
})
. join ( " " );
pages. push (pageText);
}
return pages. join ( "\n" );
} catch (err) {
throw new DocumentExtractionError (
`PDF extraction failed: ${ err instanceof Error ? err . message : String ( err ) }` ,
);
}
}
export async function extractTextFromDocx (buffer : Buffer ) : Promise < string > {
try {
const result = await mammoth. extractRawText ({ buffer });
return result.value;
} catch (err) {
throw new DocumentExtractionError (
`DOCX extraction failed: ${ err instanceof Error ? err . message : String ( err ) }` ,
);
}
}
export async function extractDocument (
buffer : Buffer ,
mimeType : string ,
extractor ?: (buffer : Buffer , mimeType : string ) => Promise < ExtractedDocument >,
) : Promise < ExtractedDocument > {
if (extractor) {
return extractor (buffer, mimeType);
}
let text : string ;
let pageCount : number | undefined ;
if (mimeType === "application/pdf" ) {
text = await extractTextFromPdf (buffer);
const doc = await pdfjsLib. getDocument ({ data: buffer }).promise;
pageCount = doc.numPages;
} else if (mimeType === "application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) {
text = await extractTextFromDocx (buffer);
} else if (mimeType. startsWith ( "image/" )) {
throw new DocumentExtractionError ( "Use OCR processor for image types" , "USE_OCR" );
} else {
throw new DocumentExtractionError (
`Unsupported MIME type: ${ mimeType }` ,
"UNSUPPORTED_FORMAT" ,
);
}
return ExtractedDocumentSchema. parse ({
text,
mimeType,
pageCount,
});
}
export async function extractFieldsFromDocument (
artifactId : string ,
fields : Array <{ name : string ; type : string ; description ?: string }>,
) : Promise < unknown > {
try {
const ops = createDocumentExtractionOperations (
{} as never ,
{} as never ,
);
const result = await ops. extractFields ({
artifactId,
fields: fields as Array <{ name : string ; type : "string" | "number" | "date" | "boolean" | "array" ; description ?: string }>,
});
return result;
} catch (err) {
throw new DocumentExtractionError (
`Field extraction failed: ${ err instanceof Error ? err . message : String ( err ) }` ,
);
}
}
this.name
=
"OrderClassificationError"
;
}
}
export async function classifyOrderDocument (
text : string ,
cache : CacheEngine ,
telemetry : CostTelemetryService ,
) : Promise <{
order : Order ;
fromCache : boolean ;
costInputTokens : number ;
costOutputTokens : number ;
}> {
const cohere = new CohereClientV2 ({});
const prompt = buildClassificationPrompt (text);
const cached = await getCachedOrder (cache, prompt);
if (cached !== null ) {
return {
order: cached,
fromCache: true ,
costInputTokens: 0 ,
costOutputTokens: 0 ,
};
}
const inputTokens = estimateTokens (prompt);
let responseText : string ;
try {
const response = await cohere. chat ({
model: "command-a-03-2025" ,
messages: [{ role: "user" , content: prompt }],
});
const message = response.message;
const contentArray = message.content;
if (Array. isArray (contentArray) && contentArray.length > 0 ) {
const content = contentArray[ 0 ];
responseText = typeof content === "string" ? content : (content as { text ?: string }).text ?? "" ;
} else {
responseText = JSON. stringify (response);
}
} catch (err) {
if (err instanceof CohereError ) {
throw new OrderClassificationError ( `Cohere API error: ${ err . message }` , err);
}
throw err;
}
const outputTokens = estimateTokens (responseText);
let order : Order ;
try {
order = await repairOrderJson (responseText);
} catch (err) {
const diagnostics = repairOrderJsonWithDiagnostics (responseText);
throw new OrderClassificationError (
`Failed to repair order JSON: ${ err instanceof Error ? err . message : String ( err ) }` ,
{ originalError: err, diagnostics },
);
}
await setCachedOrder (cache, prompt, order);
telemetry. recordCall ( "cohere" , "command-a-03-2025" , inputTokens, outputTokens, "order-classification" );
return {
order,
fromCache: false ,
costInputTokens: inputTokens,
costOutputTokens: outputTokens,
};
}
function buildClassificationPrompt (text : string ) : string {
return `Extract order information from the following document text. Return a JSON object with:
- items: array of { sku, name, quantity, unitPrice, totalPrice }
- customer: { name, email, phone?, shippingAddress: { line1, line2?, city, state, postalCode, country }, billingAddress?: { line1, line2?, city, state, postalCode, country } }
- totals: { subtotal, tax, shipping?, total, currency (3-letter code) }
Document text:
${ text }
Return ONLY valid JSON with no markdown fences or extra text.` ;
}
function estimateTokens (text : string ) : number {
return Math. ceil (text.length / 4 );
}
{ classifyOrderDocument }
from
"./order-classifier.js"
;
import { IngestResponseSchema, type IngestResponse } from "../schemas/ingest.js" ;
import type { Order } from "../schemas/order.js" ;
export class IngestionOrchestrator {
constructor (
private bigCommerce : BigCommerceClient ,
private langfuse : LangfuseService ,
private telemetry : CostTelemetryService ,
private cache : CacheEngine ,
) {}
async processDocument (buffer : Buffer , filename : string ) : Promise < IngestResponse > {
const ingestionId = generateId ();
const startTime = Date. now ();
try {
if (buffer.length === 0 ) {
return this. buildErrorResponse (ingestionId, "EMPTY_FILE" , "Buffer is empty" );
}
const fileType : DetectedFileType = await detectDocumentType (buffer);
if (fileType.category === "unknown" ) {
return this. buildErrorResponse (ingestionId, "UNSUPPORTED_FORMAT" , "Unknown file type" );
}
let extracted : ExtractedDocument ;
if (fileType.category === "image" ) {
extracted = await processImage (buffer);
} else {
extracted = await extractDocument (buffer, fileType.mime);
}
const classResult = await classifyOrderDocument (
extracted.text,
this.cache,
this.telemetry,
);
let bigCommerceOrderId : string | undefined ;
try {
const bcOrder = await this.bigCommerce. createOrder ({
items: classResult.order.items. map ((item) => ({
sku: item.sku,
name: item.name,
quantity: item.quantity,
unitPrice: item.unitPrice,
totalPrice: item.totalPrice,
})),
customer: {
name: classResult.order.customer.name,
email: classResult.order.customer.email,
phone: classResult.order.customer.phone,
shippingAddress: {
line1: classResult.order.customer.shippingAddress.line1,
line2: classResult.order.customer.shippingAddress.line2,
city: classResult.order.customer.shippingAddress.city,
state: classResult.order.customer.shippingAddress.state,
postalCode: classResult.order.customer.shippingAddress.postalCode,
country: classResult.order.customer.shippingAddress.country,
},
billingAddress: classResult.order.customer.billingAddress
? {
line1: classResult.order.customer.billingAddress.line1,
line2: classResult.order.customer.billingAddress.line2,
city: classResult.order.customer.billingAddress.city,
state: classResult.order.customer.billingAddress.state,
postalCode: classResult.order.customer.billingAddress.postalCode,
country: classResult.order.customer.billingAddress.country,
}
: undefined ,
},
totals: {
subtotal: classResult.order.totals.subtotal,
tax: classResult.order.totals.tax,
shipping: classResult.order.totals.shipping,
total: classResult.order.totals.total,
currency: classResult.order.totals.currency,
},
});
bigCommerceOrderId = String (bcOrder.id);
} catch (bcErr) {
const processingTimeMs = Date. now () - startTime;
this.langfuse. traceIngestion (ingestionId, {
documentType: fileType.category,
fileSize: buffer.length,
processingTimeMs,
success: false ,
});
return IngestResponseSchema. parse ({
success: false ,
ingestionId,
order: classResult.order,
processedByCache: classResult.fromCache,
costUsd: this.telemetry. getDailyTotal (),
error: bcErr instanceof Error ? bcErr.message : "BigCommerce order creation failed" ,
code: bcErr instanceof BigCommerceAuthError ? "BIGCOMMERCE_AUTH_ERROR"
: bcErr instanceof BigCommerceServerError ? "BIGCOMMERCE_SERVER_ERROR"
: "BIGCOMMERCE_ERROR" ,
});
}
const costUsd = this.telemetry. getDailyTotal ();
const processingTimeMs = Date. now () - startTime;
const order : Order = {
... classResult.order,
metadata: {
... classResult.order.metadata,
sourceDocument: filename,
ingestionTimestamp: new Date (). toISOString (),
confidence: 1 ,
llmCostUsd: costUsd,
processedByCache: classResult.fromCache,
},
};
this.langfuse. traceIngestion (ingestionId, {
documentType: fileType.category,
fileSize: buffer.length,
processingTimeMs,
success: true ,
});
return IngestResponseSchema. parse ({
success: true ,
ingestionId,
orderId: bigCommerceOrderId,
order,
processedByCache: classResult.fromCache,
costUsd,
});
} catch (err) {
const processingTimeMs = Date. now () - startTime;
this.langfuse. traceIngestion (ingestionId, {
documentType: "unknown" ,
fileSize: buffer.length,
processingTimeMs,
success: false ,
});
return IngestResponseSchema. parse ({
success: false ,
ingestionId,
processedByCache: false ,
costUsd: this.telemetry. getDailyTotal (),
error: err instanceof Error ? err.message : String (err),
code: err instanceof Error ? err.name : "UNKNOWN_ERROR" ,
});
}
}
private buildErrorResponse (
ingestionId : string ,
code : string ,
error : string ,
) : IngestResponse {
return IngestResponseSchema. parse ({
success: false ,
ingestionId,
processedByCache: false ,
costUsd: this.telemetry. getDailyTotal (),
error,
code,
});
}
}
function
Home
() {
const [stats, setStats] = useState < Stats | null >( null );
const [fastifyCost, setFastifyCost] = useState <{
dailyTotal : number ;
budgetStatus : string ;
} | null >( null );
const [error, setError] = useState < string | null >( null );
useEffect (() => {
fetch ( "/api/orders" )
. then ((r) => r. json ())
. then ((data : { stats : Stats }) => { setStats (data.stats); })
. catch (() => { setError ( "Could not load order data" ); });
fetch ( "http://localhost:3001/api/metrics/cost" )
. then ((r) => r. json () as Promise <{ dailyTotal : number ; budgetStatus : string }>)
. then ((v) => { setFastifyCost (v); })
. catch (() => {});
}, []);
return (
<main style ={{ padding: "2rem" , fontFamily: "system-ui, sans-serif" }}>
<h1>Cohere Document Pipeline for BigCommerce SMB</h1>
{ error && <p style ={{ color: "red" }}>{ error }</p>}
<section style ={{ marginBottom: "2rem" }}>
<h2>Dashboard</h2>
{ stats ? (
<table>
<thead>
<tr>
<th style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>Orders Processed</th>
<th style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>Total LLM Cost</th>
<th style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>Errors</th>
{ fastifyCost && <th style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>Daily Cost</th>}
</tr>
</thead>
<tbody>
<tr>
<td style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>{ stats . totalOrders }</td>
<td style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>${ stats . totalCostUsd . toFixed ( 4 )}</td>
<td style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>{ stats . totalErrors }</td>
{ fastifyCost && <td style ={{ padding: "0.5rem" , textAlign: "center" , border: "1px solid #ccc" }}>${ fastifyCost . dailyTotal . toFixed ( 4 )}</td>}
</tr>
</tbody>
</table>
) : (
<p>Loading stats...</p>
)}
</section>
{ stats && stats . recentOrders .length > 0 && (
<section style ={{ marginBottom: "2rem" }}>
<h2>Recent Orders</h2>
<table style ={{ width: "100%" , borderCollapse: "collapse" }}>
<thead>
<tr>
<th style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>ID</th>
<th style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>File</th>
<th style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>Status</th>
<th style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>Cost</th>
<th style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>Time</th>
</tr>
</thead>
<tbody>
{ stats . recentOrders . map (( o ) => (
<tr key ={ o . id }>
<td style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>{ o . id }</td>
<td style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>{ o . filename }</td>
<td style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>{ o . status }</td>
<td style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>${ o . costUsd . toFixed ( 4 )}</td>
<td style ={{ padding: "0.5rem" , border: "1px solid #ccc" }}>{new Date ( o . timestamp ). toLocaleTimeString ()}</td>
</tr>
))}
</tbody>
</table>
</section>
)}
<section style ={{ marginBottom: "2rem" }}>
<h2>API Endpoints</h2>
<ul>
<li><code>POST /api/ingest</code> (Fastify, port 3001) — Upload a PDF, DOCX, or image file</li>
<li><code>GET /api/health</code> — Health check</li>
<li><code>GET /api/metrics/cost</code> — Daily LLM cost summary (Fastify, port 3001)</li>
<li><code>GET /api/orders</code> — Recently processed orders</li>
<li><code>GET /api/orders/[id]</code> — Single order details</li>
</ul>
</section>
</main>
);
}