Anthropic Document Pipeline for Klaviyo SMB Email Campaigns
Generate personalized marketing email content from product feeds and customer segments using Anthropic Claude, and push directly to Klaviyo for SMB campaign execution.
Small business marketers spend hours manually creating email campaigns, repurposing product data and customer segments across multiple channels without consistency.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building an email campaign automation pipeline that loads product catalogs, fetches customer segments from Klaviyo, drafts personalized email content with Anthropic Claude, repairs the LLM output with Zod-driven validation, and pushes the final campaigns back to Klaviyo. You’ll use REAA packages for budget enforcement, semantic caching, structured output repair, prompt versioning, and cost telemetry. By the end you’ll have a Next.js API that can generate and schedule segmented email campaigns with a single HTTP call.
Prerequisites
Node.js >= 22 and pnpm installed on your machine
A Klaviyo account with an API key that has campaign and segment read/write permissions
An Anthropic API key for Claude
A VoyageAI API key for embedding generation (used by the semantic cache)
(Optional) A Langfuse account for LLM cost telemetry tracing
Familiarity with Next.js App Router patterns and TypeScript
Step 1: Scaffold the project and install dependencies
Create a new Next.js project with pnpm. This sets up the App Router shell, TypeScript config, ESLint, and Vitest.
Expected output:package.json shows the dependencies above with exact version pins (no ^ or ~ prefixes), and .env has your keys set.
Step 2: Create the Klaviyo API client wrapper
The klaviyo-api package provides typed API classes for campaigns, segments, and profiles. Create a wrapper class that instantiates them from a shared API key session.
terminal
mkdir -p src/lib
Write src/lib/klaviyoClient.ts:
ts
import { ApiKeySession, CampaignsApi, ProfilesApi, SegmentsApi, RetryWithExponentialBackoff, type CampaignCreateQuery, type CampaignSendJobCreateQuery, type ProfileCreateQuery,} from "klaviyo-api";export class KlaviyoClient { private session: ApiKeySession; campaignsApi: CampaignsApi; profilesApi: ProfilesApi; segmentsApi: SegmentsApi; constructor( apiKey: string, retry?: RetryWithExponentialBackoff, ) { this.session = new ApiKeySession(apiKey, retry); this.campaignsApi = new CampaignsApi(this.session); this.profilesApi = new ProfilesApi(this.session); this.segmentsApi = new SegmentsApi(this.session); } async getSegments(filter?: string) { const result = await this.segmentsApi.getSegments({ filter }); return result.body; } async createCampaign(params: CampaignCreateQuery) { const result = await this.campaignsApi.createCampaign(params); return result.body; } async sendCampaign(params: CampaignSendJobCreateQuery) { const result = await this.campaignsApi.sendCampaign(params); return result.body; } async getCampaign(id: string, options?: Record<string, unknown>) { const result = await this.campaignsApi.getCampaign(id, options); return result.body; } async getCampaigns(filter?: string) { const result = await this.campaignsApi.getCampaigns(filter ?? ""); return result.body; } async createProfile(profile: ProfileCreateQuery) { const result = await this.profilesApi.createProfile(profile); return result.body; }}
Expected output: TypeScript compiles without errors when you run pnpm typecheck. The class exposes typed methods that delegate to the Klaviyo SDK’s internal API objects.
Step 3: Build the Anthropic LLM client
The @anthropic-ai/sdk default export is the Anthropic client. Wrap it in an LLMClient that handles message creation with a configured model and max tokens. Include a streaming variant for real-time use cases.
Expected output:pnpm typecheck passes. The generate() method calls client.messages.create() with the standard Anthropic message format and extracts text from the response content blocks.
Step 4: Add budget enforcement, caching, and telemetry services
Your pipeline needs three supporting services that layer on top of REAA packages.
Budget service
Uses @reaatech/agent-budget-engine and @reaatech/agent-budget-spend-tracker to define a daily spend cap and check each generation against it.
Expected output:pnpm typecheck passes. Each service is instantiable with config options and exposes the methods the pipeline calls.
Step 5: Load and chunk product feeds
The ProductFeedLoader fetches product catalogs from a JSON URL or scrapes a webpage using Mozilla’s Readability, then converts them into typed Document objects compatible with @reaatech/hybrid-rag.
terminal
mkdir -p src/services
Write src/services/product-feed.ts:
ts
import { type Document, type Chunk, type ChunkingConfig } from "@reaatech/hybrid-rag";import { RecursiveCharacterTextSplitter } from "@langchain/textsplitters";import { Readability, isProbablyReaderable } from "@mozilla/readability";import { JSDOM } from "jsdom";export interface ProductFeedItem { id: string; title: string; description: string; price: number; category: string; imageUrl: string; sku: string;}export class ProductFeedLoader { async loadFromJson(url: string): Promise<ProductFeedItem[]> { const response = await fetch(url); if (!response.ok) { throw new Error(`Failed to fetch product feed: ${String(response.status)} ${response.statusText}`); } const data = (await response.json()) as ProductFeedItem[]; return data; } async loadFromWebPage(url: string): Promise<string> { const response = await fetch(url); if (!response.ok) { throw new Error(`Failed to fetch web page: ${String(response.status)} ${response.statusText}`); } const html = await response.text(); const dom = new JSDOM(html, { url }); const doc = dom.window.document; if (!isProbablyReaderable(doc)) { return ""; } const reader = new Readability(doc); const article = reader.parse(); return article ? (article.textContent ?? "") : ""; } toDocuments(items: ProductFeedItem[]): Document[] { return items.map((item) => ({ id: item.id, content: item.description, source: "product-feed", metadata: { title: item.title, price: item.price, category: item.category, sku: item.sku, }, })); } async chunkDocuments(docs: Document[], config: ChunkingConfig): Promise<Chunk[]> { const splitter = new RecursiveCharacterTextSplitter({ chunkSize: config.chunkSize, chunkOverlap: config.overlap, }); const chunks: Chunk[] = []; for (const doc of docs) { const texts = await splitter.splitText(doc.content); for (let idx = 0; idx < texts.length; idx++) { const text = texts[idx]; chunks.push({ id: `${doc.id}-chunk-${String(idx)}`, documentId: doc.id, content: text, tokenCount: text.length, position: idx, metadata: doc.metadata, index: idx, characterCount: text.length, startPosition: idx * 200, endPosition: idx * 200 + text.length, strategy: "recursive", } as Chunk); } } return chunks; }}
Expected output:loadFromJson returns a typed array of products. toDocuments converts them to the Document shape. chunkDocuments uses LangChain’s text splitter to break long documents into smaller chunks with full metadata for retrieval.
Step 6: Generate email content with Claude
The ContentGenerator builds a prompt that describes the campaign goal, target segment, and product catalog, then sends it to Claude. It also includes a token estimator using js-tiktoken.
Write src/services/content-generator.ts:
ts
import { z } from "zod";import { getEncoding } from "js-tiktoken";import Anthropic from "@anthropic-ai/sdk";import type { ProductFeedItem } from "./product-feed.js";export const EmailContentSchema = z.object({ subject: z.string(), previewText: z.string(), bodyHtml: z.string(), bodyText: z.string(),});export type EmailContent = z.infer<typeof EmailContentSchema>;export interface GenerationRequest { segmentName: string; products: ProductFeedItem[]; campaignGoal: string; tone: string;}export class ContentGenerator { client: Anthropic; model: string; maxTokens: number; constructor(client: Anthropic, model: string, maxTokens?: number) { this.client = client; this.model = model; this.maxTokens = maxTokens ?? 1024; } async generateEmailContent(request: GenerationRequest): Promise<string> { const productLines = request.products .map((p) => `- ${p.title}: ${p.description} $${String(p.price)}`) .join("\n"); const systemPrompt = "You are an expert email marketing copywriter for SMBs. " + "Generate a JSON email campaign with fields: subject, previewText, bodyHtml, bodyText. " + "Return ONLY valid JSON — no markdown fences, no extra text."; const userPrompt = `Campaign goal: ${request.campaignGoal}Segment: ${request.segmentName}Tone: ${request.tone}Products:${productLines}Generate the email JSON:`; const message = await this.client.messages.create({ model: this.model, max_tokens: this.maxTokens, system: systemPrompt, messages: [{ role: "user", content: userPrompt }], }); const block = message.content[0]; if (block.type === "text") { return block.text; } return ""; } estimateTokens(text: string): number { if (text.length === 0) return 0; try { const enc = getEncoding("cl100k_base"); return enc.encode(text).length; } catch { return Math.ceil(text.length / 4); } }}
Expected output: The generator sends a structured prompt to Claude asking for JSON email content. The estimateTokens method returns accurate token counts using the cl100k_base encoding.
Step 7: Repair LLM output with structured-repair-core
LLMs sometimes return malformed JSON — markdown fences, trailing commas, truncated responses. The ContentRepair class uses @reaatech/structured-repair-core to fix those issues against your Zod schema.
Expected output: Pass valid JSON and it comes back unchanged. Pass JSON wrapped in triple-backtick fences, a trailing comma, or even free-form prose, and the repair pipeline produces valid EmailContent.
Step 8: Create the segment and campaign service wrappers
Two thin service classes that sit on top of KlaviyoClient.
Expected output: The segment service fetches and parses Klaviyo segments. The campaign service constructs a properly formatted campaign object and pushes it to Klaviyo.
Step 9: Build the email campaign pipeline
The EmailCampaignPipeline orchestrates the full workflow for every active segment: load products, check budget and cache, generate content with Claude, repair it, create the campaign, record spend, cache the result, and emit telemetry.
terminal
mkdir -p src/pipeline
Write src/pipeline/email-campaign.ts:
ts
import type { ContentGenerator, EmailContent } from "../services/content-generator.js";import type { ContentRepair } from "../services/content-repair.js";import type { ProductFeedLoader, ProductFeedItem } from "../services/product-feed.js";import type { SegmentService, CustomerSegment } from "../services/segment-service.js";import type { CampaignService } from "../services/campaign-service.js";import type { BudgetService } from "../lib/budget-service.js";import type { CacheService } from "../lib/cache-service.js";import type { TelemetryService } from "../lib/telemetry-service.js";export interface EmailCampaignConfig {
Expected output: The pipeline iterates through each active Klaviyo segment, generates personalized email content, repairs malformed output, creates a campaign in Klaviyo, and returns results. Failed segments are skipped without crashing the entire run.
Write app/api/generate/route.ts — generates email content for a specific segment:
ts
import { type NextRequest, NextResponse } from "next/server";import { z } from "zod";import Anthropic from "@anthropic-ai/sdk";import { ContentGenerator } from "../../../src/services/content-generator.js";import { ContentRepair } from "../../../src/services/content-repair.js";import { BudgetService } from "../../../src/lib/budget-service.js";import { ProductFeedLoader } from "../../../src/services/product-feed.js";import { SegmentService } from "../../../src/services/segment-service.js";import { CampaignService } from "../../../src/services/campaign-service.js";import { KlaviyoClient } from "../../../src/lib/klaviyoClient.js";
Write app/api/campaigns/route.ts — runs the full pipeline across all segments:
ts
import { type NextRequest, NextResponse } from "next/server";import { z } from "zod";import Anthropic from "@anthropic-ai/sdk";import { ContentGenerator } from "../../../src/services/content-generator.js";import { ContentRepair } from "../../../src/services/content-repair.js";import { ProductFeedLoader } from "../../../src/services/product-feed.js";import { SegmentService } from "../../../src/services/segment-service.js";import { CampaignService } from "../../../src/services/campaign-service.js";import { BudgetService } from "../../../src/lib/budget-service.js";import { CacheService } from "../../../src/lib/cache-service.js";
Update app/page.tsx with a landing page that describes the API:
tsx
import styles from "./page.module.css";export default function Home() { return ( <div className={styles.page}> <main className={styles.main}> <h1>Anthropic Document Pipeline for Klaviyo SMB Email Campaigns</h1> <p className={styles.description}> Generate personalized marketing email content from product feeds and customer segments using Anthropic Claude, and push directly to Klaviyo for SMB campaign execution. </p> <div className={styles.endpoints}> <h2>API Endpoints</h2> <ul> <li> <code>POST /api/generate</code> — Generate email content for a segment </li> <li> <code>POST /api/campaigns</code> — Run the full pipeline and create a campaign </li> <li> <code>POST /api/webhooks/klaviyo</code> — Receive Klaviyo event notifications </li> </ul> </div> <div className={styles.info}> <h2>How It Works</h2> <ol> <li>Load product feed from a JSON data source</li> <li>Retrieve active customer segments via Klaviyo API</li> <li>Pass products and segments to Anthropic Claude to draft email content</li> <li>Repair and validate output with structured-repair-core</li> <li>Create the campaign in Klaviyo</li> </ol> </div> <div className={styles.examples}> <h2>API Usage Examples</h2> <h3>Generate email content</h3> <pre>{`curl -X POST http://localhost:3000/api/generate \\\\ -H "Content-Type: application/json" \\\\ -d '{ "productFeedUrl": "https://example.com/feed.json", "segmentName": "VIP Customers", "campaignGoal": "Promote new arrivals", "tone": "professional" }'`}</pre> <h3>Create a campaign</h3> <pre>{`curl -X POST http://localhost:3000/api/campaigns \\\\ -H "Content-Type: application/json" \\\\ -d '{ "productFeedUrl": "https://example.com/feed.json", "listId": "YOUR_LIST_ID", "campaignGoal": "Weekly newsletter", "tone": "casual" }'`}</pre> <h3>Health check</h3> <pre>{`curl http://localhost:3000/api/generate`}</pre> </div> </main> </div> );}
Expected output:pnpm dev starts the Next.js dev server. POST /api/generate with a valid body returns a JSON response containing the generated email content and cost tracking.
Step 11: Set up the public API exports
Update src/index.ts to re-export every public class and type so consumers can import them as a library:
ts
// Entry point — re-exports public API for programmatic consumption.export { KlaviyoClient } from "./lib/klaviyoClient.js";export { LLMClient } from "./lib/llm-client.js";export { CacheService } from "./lib/cache-service.js";export { TelemetryService } from "./lib/telemetry-service.js";export { BudgetService } from "./lib/budget-service.js";export { ProductFeedLoader, type ProductFeedItem } from "./services/product-feed.js";export { ContentGenerator, type EmailContent, type GenerationRequest } from "./services/content-generator.js";export { ContentRepair } from "./services/content-repair.js";export { SegmentService, type CustomerSegment } from "./services/segment-service.js";export { CampaignService } from "./services/campaign-service.js";export { PromptService } from "./services/prompt-service.js";export { EmailCampaignPipeline, type EmailCampaignConfig, type EmailCampaignResult } from "./pipeline/email-campaign.js";
Step 12: Add tests and verify
Create a shared MSW test setup that mocks all external HTTP endpoints.
Write tests/setup.ts:
ts
import { setupServer } from "msw/node";import { http, HttpResponse } from "msw";import { beforeAll, afterEach, afterAll } from "vitest";export const server = setupServer( http.post("https://api.anthropic.com/v1/messages", () => HttpResponse.json({ id: "msg_test", type: "message", role: "assistant", model: "claude-sonnet-4-6", content: [{ type: "text", text: "mocked response" }], stop_reason: "end_turn", usage: { input_tokens: 10, output_tokens: 5 }, }), ), http.post("https://api.voyageai.com/v1/embeddings", () => HttpResponse.json({ data: [{ embedding: [0.1, 0.2, 0.3] }, { embedding: [0.4, 0.5, 0.6] }], model: "voyage-3", }), ), http.all("https://a.klaviyo.com/api/*", ({ request }) => { const url = new URL(request.url); if (url.pathname.includes("segments")) { return HttpResponse.json({ data: [ { id: "seg-1", attributes: { name: "VIP Customers" } }, { id: "seg-2", attributes: { name: "New Subscribers" } }, ], }); } if (url.pathname.includes("campaigns")) { if (request.method === "POST") { return HttpResponse.json({ data: { id: "camp-123" } }); } return HttpResponse.json({ data: [ { id: "camp-1", attributes: { name: "Test Campaign" } }, ], }); } if (url.pathname.includes("profiles")) { return HttpResponse.json({ data: [] }); } return HttpResponse.json({ data: [] }); }), http.get("https://example.com/*", ({ request }) => { const url = new URL(request.url); if (url.pathname.includes("feed.json")) { return HttpResponse.json([ { id: "p1", title: "Product 1", description: "Desc 1", price: 10, category: "Cat1", imageUrl: "https://example.com/img.jpg", sku: "SKU001" }, ]); } if (url.pathname.includes("not-found")) { return new Response(null, { status: 404, statusText: "Not Found" }); } if (url.pathname.includes("nonreaderable")) { return HttpResponse.text("<html><body><p>Hi</p></body></html>"); } return HttpResponse.text("<html><body><article><h1>Test Article That Is Long Enough For Reader View</h1><p>" + "This is a lengthy article about product descriptions. It contains enough text to pass the isProbablyReaderable check, which requires a minimum content length to determine if the page is suitable for parsing. ".repeat(5) + "</p></article></body></html>"); }),);beforeAll(() => { server.listen({ onUnhandledRequest: "error" }); });afterEach(() => { server.resetHandlers(); });afterAll(() => { server.close(); });
Create a test for the pipeline in tests/email-campaign.test.ts:
Expected output: All tests pass. Coverage meets the thresholds: lines 90%, branches 70%, functions 90%, statements 90% across all runtime code under src/**/*.ts and app/**/route.ts.
Next steps
Add Upstash Vector as a persistent vector store — replace the InMemoryAdapter with an Upstash Vector backend so cached embeddings survive server restarts and scale across instances
Wire in Prompt Version Control — fetch email generation prompts from a PVC server instead of hardcoding the system prompt, enabling A/B testing and version rollbacks
Add a campaign scheduling endpoint — extend the API to accept an ISO timestamp and call scheduleCampaign via Klaviyo’s CampaignSendJob API