Small businesses often need to clean and transform CSV, JSON, or database exports but lack the infrastructure to safely execute LLM-generated code. Running it directly risks data corruption, runaway costs, or exposure of sensitive records.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This recipe builds an API service that accepts raw CSV, JSON, or SQL data, asks Claude to generate a data-cleaning transformation, executes that code in an isolated E2B sandbox, auto-repairs any malformed output using structured repair, and returns the cleaned result — all while tracking per-job token costs and supporting idempotency for safe retries. It is built on Next.js 16 (App Router) and integrates four REAA packages for repair, confidence-based routing, cost telemetry, and idempotency middleware.
Prerequisites
Node.js >= 22 and pnpm@10 installed
An Anthropic API key for Claude (set as ANTHROPIC_API_KEY)
An E2B API key for the code sandbox (set as E2B_API_KEY)
Basic familiarity with TypeScript, Next.js App Router, and Zod schemas
Step 1: Scaffold the project and configure environment
Start from the scaffold directory — it ships with Next.js 16, TypeScript, Vitest, ESLint, and all third-party dependencies already pinned in package.json. Verify the key files are in place:
terminal
ls package.json next.config.ts tsconfig.json vitest.config.ts .env.example
Open .env.example — it lists the four environment variables the service needs:
env
# Env vars for anthropic-code-sandbox-for-smb-data-cleansing-pipelinesANTHROPIC_API_KEY=<your-anthropic-key>E2B_API_KEY=<your-e2b-api-key>DEFAULT_DAILY_BUDGET=5.00ANTHROPIC_MODEL=claude-sonnet-4-6
Copy it to .env.local and fill in your real API keys:
terminal
cp .env.example .env.local
Expected output:.env.local exists with your Anthropic key and E2B key in place.
Step 2: Define the shared Zod schemas and error classes
Create src/lib/schemas.ts — this file holds every Zod schema used across the pipeline, plus custom error classes for each failure mode. The format classifier, code generator, result validator, API route handler, and job manager all import from here.
Expected output:pnpm typecheck passes — no type errors yet because nothing imports this file.
Step 3: Build the format classifier
Create src/services/format-classifier.ts. This function sends the raw data to Claude with a system prompt asking it to classify the format, validates the JSON response through repair() (from @reaatech/structured-repair-core), then feeds the predictions to ConfidenceRouter to decide whether to route, clarify, or fallback.
ts
import Anthropic from "@anthropic-ai/sdk";import { ConfidenceRouter } from "@reaatech/confidence-router";import { repair, UnrepairableError } from "@reaatech/structured-repair-core";import { FormatClassificationSchema, FormatNotRecognizedError } from "../lib/schemas.js";export async function classifyInputFormat( data: string,): Promise<{ label: string; predictions: Array<{ label: string; confidence: number }> }> { const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY, }); const message = await client.messages.create({ model: process.env.ANTHROPIC_MODEL ?? "claude-sonnet-4-6", max_tokens: 512, system: "You are a data format classifier. Classify the input data as one of: csv, json, sql, or unknown. Return only valid JSON with predictions array.", messages: [ { role: "user", content: data, }, ], }); const block = message.content[0]; if (block.type !== "text") { throw new FormatNotRecognizedError("No text block in response"); } const text = block.text; let classification: { predictions: Array<{ label: string; confidence: number }> }; try { classification = await repair(FormatClassificationSchema, text); } catch (error) { if (error instanceof UnrepairableError) { throw new FormatNotRecognizedError(`Failed to parse classification: ${error.message}`); } throw error; } const router = new ConfidenceRouter({ routeThreshold: 0.7, fallbackThreshold: 0.3, }); const decision = router.decide({ predictions: classification.predictions, }); if (decision.type === "ROUTE") { const target = decision.target; if (target === undefined) { throw new FormatNotRecognizedError("No target in routing decision"); } return { label: target, predictions: classification.predictions }; } if (decision.type === "CLARIFY") { return { label: "CLARIFY", predictions: classification.predictions }; } throw new FormatNotRecognizedError("Format could not be determined");}
Expected output:pnpm typecheck passes. The file uses the system SDK parameter (not a user-message hack) and passes predictions to ConfidenceRouter.decide().
Step 4: Create the cost tracker
Create src/services/cost-tracker.ts. This wraps @reaatech/llm-cost-telemetry to record cost spans per API call and enforce a daily budget ceiling.
Expected output:pnpm typecheck passes. enforceBudgetOrThrow takes a feature: string parameter and reads the daily budget from loadConfig().
Step 5: Build the code generator
Create src/services/code-generator.ts. This function enforces the budget first, then asks Claude to generate a data-cleaning JavaScript function, validates the structured JSON response through repair(), and records the cost span.
ts
import Anthropic from "@anthropic-ai/sdk";import { repair, UnrepairableError } from "@reaatech/structured-repair-core";import { CodeGenOutputSchema, ValidationError } from "../lib/schemas.js";import type { CostTracker } from "./cost-tracker.js";export async function generateTransformationCode( costTracker: CostTracker, inputData: string, format: string, instructions?: string,): Promise<{ code: string; description: string; tokensUsed: { input: number; output: number } }> { costTracker.enforceBudgetOrThrow("code-gen"); const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY, }); const model = process.env.ANTHROPIC_MODEL ?? "claude-sonnet-4-6"; const message = await client.messages.create({ model, max_tokens: 4096, system: "You are a data transformation expert. Output ONLY valid JSON with keys: code, description, expectedOutputShape. The code must be a self-contained JavaScript function that takes the input data string and returns cleaned output.", messages: [ { role: "user", content: `Transform this ${format} data:\n\n${inputData}\n\nInstructions: ${instructions ?? "Clean and normalize"}`, }, ], }); const block = message.content[0]; if (block.type !== "text") { throw new ValidationError("No text block in response"); } const text = block.text; let output: { code: string; description: string; expectedOutputShape: string }; try { output = await repair(CodeGenOutputSchema, text); } catch (error) { if (error instanceof UnrepairableError) { throw new ValidationError(`Failed to parse code generation output: ${error.message}`); } throw error; } costTracker.recordSpan( "anthropic", model, message.usage.input_tokens, message.usage.output_tokens, "code-gen", ); return { code: output.code, description: output.description, tokensUsed: { input: message.usage.input_tokens, output: message.usage.output_tokens, }, };}
Expected output:pnpm typecheck passes. Claude’s response is repaired via repair() before being returned, and enforceBudgetOrThrow runs first.
Step 6: Build the sandbox executor
Create src/services/sandbox-executor.ts. This wraps the E2B Code Interpreter sandbox — it wraps the generated code and input data into a Python script, runs it inside an ephemeral sandbox with a 30-second timeout, and always cleans up via sbx.kill().
Expected output:pnpm typecheck passes. The executor wraps input data into transform_input = """...""" and runs the user’s code inside the E2B sandbox.
Step 7: Create the result validator
Create src/services/result-validator.ts. After the sandbox returns raw output, this function applies repairOutput() from @reaatech/structured-repair-core to validate and fix any malformed output against a Zod schema.
Expected output:pnpm typecheck passes. repairOutput() is properly awaited via await Promise.resolve().
Step 8: Wire the idempotency middleware
Create src/services/idempotency.ts. This module uses top-level await to create a singleton IdempotencyMiddleware backed by an in-memory adapter with a 24-hour TTL, scoped to POST requests.
ts
import { MemoryAdapter, IdempotencyMiddleware } from "@reaatech/idempotency-middleware";const storage = new MemoryAdapter();await storage.connect();export const middleware = new IdempotencyMiddleware(storage, { ttl: 86_400_000, methods: ["POST"],});
Expected output:pnpm typecheck passes. The singleton is imported directly by the route handler.
Step 9: Create the Job Manager orchestrator
Create src/services/job-manager.ts. This is the pipeline orchestrator that injects all five services (classifier, cost tracker, generator, executor, validator) and runs them in sequence: classify → generate → execute → validate → return.
Expected output:pnpm typecheck passes. All five services are injected via constructor, not imported directly.
Step 10: Preload environment variables
Create src/env.ts to import dotenv/config at the top of the module graph. Next.js loads .env natively in production, but this ensures the dotenv package is imported and the REAA packages can discover env vars during startup.
ts
import "dotenv/config";
Expected output:pnpm typecheck passes — this is a one-liner.
Step 11: Create the API route handlers
Create app/api/jobs/route.ts — the main POST endpoint that accepts transformation requests, wires the idempotency middleware, runs the pipeline, and maps typed errors to HTTP status codes.
ts
import { type NextRequest, NextResponse } from "next/server";import { z } from "zod";import { IdempotencyError } from "@reaatech/idempotency-middleware";import { JobRequestSchema, FormatNotRecognizedError, BudgetExceededError, CodeExecutionError, ValidationError,} from "@/src/lib/schemas.js";import { classifyInputFormat } from "@/src/services/format-classifier.js";import { CostTracker } from "@/src/services/cost-tracker.js";import { generateTransformationCode } from "@/src/services/code-generator.js";import { SandboxExecutor } from "@/src/services/sandbox-executor.js";import { validateResult } from "@/src/services/result-validator.js";import { JobManager } from "@/src/services/job-manager.js";import { middleware } from "@/src/services/idempotency.js";const costTracker = new CostTracker();const sandboxExecutor = new SandboxExecutor();export const jobManager = new JobManager({ classifier: { classifyInputFormat }, costTracker, generator: { generateTransformationCode }, executor: sandboxExecutor, validator: { validateResult },});export async function POST(req: NextRequest) { try { const rawBody: unknown = await req.json(); const parsed = JobRequestSchema.parse(rawBody); const idempotencyKey = req.headers.get("idempotency-key"); const runPipeline = async () => { return jobManager.runTransformationPipeline(parsed); }; let result; if (idempotencyKey) { result = await middleware.execute( idempotencyKey, { method: "POST", path: "/api/jobs", body: parsed }, runPipeline, ); } else { result = await runPipeline(); } return NextResponse.json(result, { status: 200 }); } catch (error) { if (error instanceof z.ZodError) { return NextResponse.json( { error: "Validation failed", details: error.issues }, { status: 422 }, ); } if (error instanceof FormatNotRecognizedError) { return NextResponse.json({ error: error.message }, { status: 422 }); } if (error instanceof BudgetExceededError) { return NextResponse.json({ error: error.message }, { status: 402 }); } if (error instanceof CodeExecutionError) { return NextResponse.json({ error: error.message }, { status: 500 }); } if (error instanceof ValidationError) { return NextResponse.json({ error: error.message }, { status: 422 }); } if (error instanceof IdempotencyError) { return NextResponse.json( { error: error.message }, { status: error.getStatusCode() }, ); } throw error; }}
Expected output:pnpm typecheck passes. All route handler params use NextRequest and all responses use NextResponse.json() — no bare Request or new Response().
Now create app/api/jobs/[id]/route.ts — a simple GET endpoint to check job status by ID.
ts
import { type NextRequest, NextResponse } from "next/server";import { jobManager } from "../route.js";export async function GET( _req: NextRequest, { params }: { params: Promise<{ id: string }> },) { const { id } = await params; const job = jobManager.getJob(id); if (job) { return NextResponse.json({ id, status: "completed" }); } return NextResponse.json({ error: "not found" }, { status: 404 });}
Expected output:pnpm typecheck passes. This uses Next 16’s async params pattern.
Step 12: Update the entry point and run the tests
Update src/index.ts with re-exports for programmatic consumers:
ts
// This recipe's entry point is the Next.js App Router at `app/api/jobs/route.ts`.// This file exists for programmatic consumers who want to import the services directly.export { JobManager } from "./services/job-manager.js";export { CostTracker } from "./services/cost-tracker.js";export { SandboxExecutor } from "./services/sandbox-executor.js";export { FormatNotRecognizedError, BudgetExceededError, CodeExecutionError, ValidationError } from "./lib/schemas.js";export * from "./lib/schemas.js";
Now run the quality gates in sequence:
terminal
pnpm typecheck
Expected output:tsc --noEmit exits with code 0.
terminal
pnpm lint
Expected output: ESLint exits with code 0.
terminal
pnpm vitest run --coverage --reporter=json --outputFile=vitest-report.json
Expected output: All tests pass, coverage at or above 90% on lines, branches, functions, and statements. The test suite includes:
CostTracker — recordSpan, getSessionCost, enforceBudgetOrThrow (under budget, over budget, exactly at limit), reset
Format classifier — CSV and JSON classification, low-confidence fallback, unrepairable JSON, non-text response, ambiguous predictions