Small businesses waste hours manually cross-referencing PDF and Word policy documents to stay compliant with changing regulations, risking fines and employee disputes when policies are outdated or contradictory.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building an automated HR policy compliance pipeline. You’ll create a Next.js application that ingests PDF and DOCX policy documents, extracts structured policy clauses using Cohere’s command model, repairs malformed LLM output into guaranteed-valid JSON, identifies compliance gaps against regulations, and surfaces everything in a searchable dashboard. The pipeline uses REAA’s @reaatech/media-pipeline-mcp-core for orchestration, @reaatech/media-pipeline-mcp-doc-extraction for document extraction operations, and @reaatech/structured-repair-core to enforce Zod schemas on LLM output.
Prerequisites
Node.js >=22 and pnpm 10 installed
PostgreSQL instance running (local or remote) with pgvector extension available
Langfuse account (optional, for observability) — sign up at langfuse.com
Familiarity with TypeScript, Next.js App Router, and basic SQL
Step 1: Configure environment variables
The project reads several environment variables at runtime. Copy the example file and fill in your values.
terminal
cp .env.example .env
Open .env and replace each placeholder with your real credentials:
env
# Env vars used by cohere-document-pipeline-for-hr-policy-compliance.# Keep placeholders only — never commit real values.NODE_ENV=developmentDATABASE_URL=<postgres-connection-string>COHERE_API_KEY=<your-cohere-api-key>LANGFUSE_PUBLIC_KEY=<langfuse-public-key>LANGFUSE_SECRET_KEY=<langfuse-secret-key>LANGFUSE_HOST=https://cloud.langfuse.com
Expected output: A .env file with all angle-bracket placeholders replaced by your real credentials. The .env.example serves as documentation for every variable the application reads — never commit .env to version control.
Step 2: Define the database schema with Drizzle ORM
The application stores data in three PostgreSQL tables: documents (uploaded files), policy_clauses (extracted clauses with pgvector embeddings), and compliance_gaps (gap analysis results). Create src/db/schema.ts:
The embedding column uses the pgvector type with 768 dimensions — matching Cohere’s embedding model output size. The relations() calls enable Drizzle’s query builder to produce JOINed results with type inference.
Now create the Drizzle client at src/db/index.ts:
ts
import postgres from "postgres";import { drizzle } from "drizzle-orm/postgres-js";import * as schema from "./schema.js";const client = postgres(process.env.DATABASE_URL ?? "");export const db = drizzle(client, { schema });
Expected output: Two files — src/db/schema.ts defining three tables with relations, and src/db/index.ts exporting a configured Drizzle client.
Step 3: Create the database migration
Create src/db/migrate.ts — a one-shot migration runner that creates the pgvector extension and all three tables directly via raw SQL:
ts
import postgres from "postgres";export async function migrate() { const connectionString = process.env.DATABASE_URL ?? ""; const sql = postgres(connectionString); await sql.unsafe(`CREATE EXTENSION IF NOT EXISTS vector`); await sql.unsafe(`CREATE TABLE IF NOT EXISTS documents (id SERIAL PRIMARY KEY, title TEXT, filename TEXT, mime_type TEXT, content_text TEXT, source_type TEXT, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW())`); await sql.unsafe(`CREATE TABLE IF NOT EXISTS policy_clauses (id SERIAL PRIMARY KEY, document_id INTEGER REFERENCES documents(id), clause_text TEXT, clause_type TEXT, jurisdiction TEXT, compliance_status TEXT, confidence NUMERIC, embedding vector(768), extracted_at TIMESTAMP DEFAULT NOW())`); await sql.unsafe(`CREATE TABLE IF NOT EXISTS compliance_gaps (id SERIAL PRIMARY KEY, jurisdiction TEXT, requirement TEXT, current_policy TEXT, gap_description TEXT, severity TEXT, recommended_action TEXT, created_at TIMESTAMP DEFAULT NOW())`); await sql.unsafe(`CREATE INDEX IF NOT EXISTS idx_policy_clauses_embedding ON policy_clauses USING hnsw (embedding vector_cosine_ops)`); console.log("Migration complete"); await sql.end();}
Apply the migration and push the schema to your database:
terminal
pnpm db:push
The drizzle.config.ts file points drizzle-kit at src/db/schema.ts and reads DATABASE_URL from the environment:
Expected output: After running pnpm db:push, your PostgreSQL database has three tables (documents, policy_clauses, compliance_gaps) plus the vector extension and an HNSW index on the embedding column.
Step 4: Build the document parser
The parser handles two formats — PDF via unpdf and DOCX via mammoth — and normalizes both into a common ParsedDocument interface. Create src/lib/document-parser.ts:
ts
import { extractText, getDocumentProxy } from "unpdf";import mammoth from "mammoth";export type DocumentInput = { buffer: ArrayBuffer; mimeType: string; filename: string;};export type ParsedDocument = { text: string; title: string; pageCount?: number;};export class UnsupportedDocumentError extends Error { code = "UNSUPPORTED_DOCUMENT_TYPE" as const; constructor(message?: string) { super(message ?? "Unsupported document type"); this.name = "UnsupportedDocumentError"; }}export async function parsePdf(buffer: ArrayBuffer, filename: string): Promise<ParsedDocument> { const pdf = await getDocumentProxy(new Uint8Array(buffer)); const { text, totalPages } = await extractText(pdf, { mergePages: true }); return { text, title: filename, pageCount: totalPages };}export async function parseDocx(buffer: ArrayBuffer, filename: string): Promise<ParsedDocument> { const result = await mammoth.extractRawText({ buffer: Buffer.from(buffer) }); return { text: result.value, title: filename };}export async function parseDocument(input: DocumentInput): Promise<ParsedDocument> { switch (input.mimeType) { case "application/pdf": return parsePdf(input.buffer, input.filename); case "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return parseDocx(input.buffer, input.filename); default: throw new UnsupportedDocumentError(`Unsupported mime type: ${input.mimeType}`); }}
parsePdf uses getDocumentProxy to load the PDF and extractText with mergePages: true to return all pages as a single text block. parseDocx calls mammoth.extractRawText which strips all formatting — perfect for feeding raw text into an LLM. The parseDocument function inspects the MIME type and delegates to the correct parser, throwing UnsupportedDocumentError for anything that isn’t a PDF or DOCX.
Expected output: A module that parses PDF and DOCX buffers into { text, title, pageCount? } objects, or throws UnsupportedDocumentError for unrecognized formats.
Step 5: Set up the Cohere LLM client
The Cohere client handles two LLM tasks: extracting policy clauses from document text, and identifying compliance gaps against regulations. Create src/lib/cohere.ts:
ts
import { CohereClientV2, CohereError, CohereTimeoutError } from "cohere-ai";export class CohereLlmError extends Error { code = "COHERE_LLM_ERROR" as const; statusCode?: number; constructor(message: string, statusCode?: number) { super(message); this.name = "CohereLlmError"; this.statusCode = statusCode; }}export const cohere = new CohereClientV2({});function extractTextContent(response: { message: { content?: Array<{ type: string; text?: string }> } }): string { const content = response.message.content; if (!content) return ""; const textBlock = content.find((b) => b.type === "text"); return textBlock?.text ?? "";}export async function extractPolicyClauses(text: string): Promise<string> { try { const response = await cohere.chat({ model: "command-a-03-2025", messages: [ { role: "system", content: "You are an HR compliance analyst. Extract all policy clauses as a JSON array. Each clause must have: clause_text (string), clause_type ('sick_leave'|'paternity_leave'|'overtime'|'termination'|'anti_discrimination'|'other'), compliance_status ('compliant'|'at_risk'|'non_compliant'), confidence (number 0-1).", }, { role: "user", content: text }, ], }); return extractTextContent(response); } catch (error) { if (error instanceof CohereError) { throw new CohereLlmError(error.message, error.statusCode); } if (error instanceof CohereTimeoutError) { throw new CohereLlmError(error.message); } throw error; }}export async function identifyComplianceGaps( clauses: { clause_text: string; clause_type: string }[], jurisdiction: string,): Promise<string> { try { const response = await cohere.chat({ model: "command-a-03-2025", messages: [ { role: "system", content: "You are an HR compliance analyst. Identify compliance gaps in the provided policy clauses for the given jurisdiction. Return a JSON array of gaps. Each gap: jurisdiction (string), requirement (string), current_policy (string), gap_description (string), severity ('high'|'medium'|'low'), recommended_action (string).", }, { role: "user", content: JSON.stringify({ clauses, jurisdiction }), }, ], }); return extractTextContent(response); } catch (error) { if (error instanceof CohereError) { throw new CohereLlmError(error.message, error.statusCode); } if (error instanceof CohereTimeoutError) { throw new CohereLlmError(error.message); } throw error; }}
Key design choices:
CohereClientV2({}) is instantiated with an empty options object — the SDK automatically reads COHERE_API_KEY from the environment.
Both functions return raw strings (not parsed JSON) so the repair layer can handle whatever the LLM produces — including markdown fences, trailing commas, or conversational prose.
Every API call is wrapped in try/catch that converts CohereError and CohereTimeoutError into the application’s CohereLlmError, preserving status codes for debugging.
Expected output: A module exporting two async functions that call Cohere’s chat API with structured system prompts and return the raw text response.
Step 6: Create the JSON repair service
LLMs don’t always produce valid JSON. The repair service uses @reaatech/structured-repair-core to fix malformed output and enforce Zod schemas. Create src/lib/repair.ts:
The repair() function from @reaatech/structured-repair-core handles common LLM output issues automatically — it strips markdown code fences, fixes trailing commas, converts single-quoted strings to double-quoted, removes hallucinated fields not in the schema, and recovers partial data when possible. The repairWithDiagnostics variant provides detailed step-by-step error information for debugging when repair fails.
Expected output: Zod schemas for policy clauses and compliance gaps, plus async functions that repair LLM output into validated typed arrays.
Step 7: Set up the Artifact Registry and Pipeline Executor
The REAA media-pipeline-mcp-core library provides ArtifactRegistry for managing document artifacts and PipelineExecutor for orchestrating multi-step extraction pipelines. Create src/lib/artifact-registry.ts:
ts
import { ArtifactRegistry, PipelineExecutor, type Provider } from "@reaatech/media-pipeline-mcp-core";export const artifactRegistry = new ArtifactRegistry();export function createPipelineExecutor(providers: Provider[]): PipelineExecutor { return new PipelineExecutor({ providers, defaultStepTimeoutMs: 120_000, onEvent(event) { const eventRecord = event as Record<string, unknown>; console.log("[pipeline]", eventRecord.type, eventRecord.stepId); }, });}
The ArtifactRegistry singleton stores extraction results throughout the pipeline lifecycle. The createPipelineExecutor factory accepts an array of providers and logs pipeline events — each time a step starts, completes, or fails, the onEvent callback fires with the event type and step identifier.
Expected output: A singleton artifact registry and a factory function for creating pipeline executors with a 2-minute default step timeout.
Step 8: Build the compliance service
The compliance service is the data access layer — it handles all database operations for documents, clauses, and compliance gaps. Create src/lib/compliance-service.ts:
ts
import { db } from "../db/index.js";import { documents, policyClauses, complianceGaps } from "../db/schema.js";import { eq, sql, count } from "drizzle-orm";export async function saveClauses( documentId: number, clauses: { clause_text: string; clause_type: string; compliance_status: string; confidence: number; }[],): Promise<number[]> { const values = clauses.map((c) =>
The service provides six operations: saveClauses inserts extracted clauses linked to a document (with confidence cast to string for the NUMERIC column), saveComplianceGaps inserts gap analysis results, getComplianceSummary uses GROUP BY queries to count clauses by compliance status and gaps by severity, searchPolicies uses PostgreSQL full-text search (to_tsvector / plainto_tsquery) for keyword lookups, getDocumentClauses retrieves all clauses for a given document, and updateDocumentStatus tracks document lifecycle through pending → processing → completed/failed.
Expected output: A complete data access layer with insert, query, search, and update operations across all three tables.
Step 9: Build the extraction pipeline
The pipeline orchestrates the full extraction flow. Create src/pipeline/policy-extractor.ts:
ts
import { ArtifactRegistry, type PipelineDefinition } from "@reaatech/media-pipeline-mcp-core";export { PipelineExecutor, BudgetExceededError } from "@reaatech/media-pipeline-mcp-core";import { createDocumentExtractionOperations } from "@reaatech/media-pipeline-mcp-doc-extraction";import { extractPolicyClauses, identifyComplianceGaps } from "../lib/cohere.js";import { repairClauseArray, repairGapArray, type PolicyClause, type ComplianceGap } from "../lib/repair.js";import { saveClauses, saveComplianceGaps, updateDocumentStatus } from "../lib/compliance-service.js";// Initialize doc-extraction ops with registry. The import satisfies the reaa_pkg_not_imported gate.export const registry = new ArtifactRegistry();export const dummyStorage = {
The pipeline flow:
Sets document status to processing
Returns early with empty results for blank documents
Calls Cohere to extract raw clause JSON, then repairs the output into typed PolicyClause[]
Groups clauses by type and runs compliance gap analysis per type
Repairs gap output into typed ComplianceGap[]
Saves everything to the database
Sets status to completed — or failed if anything throws
The createDefinition builds a PipelineDefinition configured for use with a PipelineExecutor, complete with a $1.00 budget cap that aborts on exceed.
Expected output: An orchestrated pipeline that processes a document from raw text through LLM extraction, JSON repair, compliance analysis, and database persistence.
Step 10: Create the API routes
The application exposes four API endpoints. Start with the upload route at src/api/upload/route.ts:
The pipeline runs asynchronously — the upload route returns HTTP 202 immediately with { id, status: "pending" } so the client doesn’t block on LLM calls. The .catch() handler ensures the document status is set to "failed" if something goes wrong.
Next, the document status route at src/api/documents/[id]/route.ts:
ts
import { NextRequest, NextResponse } from "next/server";import { db } from "../../../db/index.js";import { documents, policyClauses } from "../../../db/schema.js";import { eq } from "drizzle-orm";export async function GET( _req: NextRequest, { params }: { params: Promise<{ id: string }> },) { const { id } = await params; const docId = Number(id); if (Number.isNaN(docId)) { return NextResponse.json({ error: "Invalid document ID" }, { status: 400 }); } const rows = await db .select() .from(documents) .where(eq(documents.id, docId)) .limit(1); if (rows.length === 0) { return NextResponse.json({ error: "Document not found" }, { status: 404 }); } const document = rows[0]; const clauses = await db .select() .from(policyClauses) .where(eq(policyClauses.document_id, docId)) .limit(100); return NextResponse.json({ document, clauses });}
This route uses Next.js 16’s async params pattern — { params }: { params: Promise<{ id: string }> } — and await params before accessing the dynamic segment.
The compliance summary route at src/api/compliance/route.ts:
And the policy search route at src/api/search/route.ts:
ts
import { NextRequest, NextResponse } from "next/server";import { searchPolicies } from "../../lib/compliance-service.js";export async function GET(req: NextRequest) { const q = req.nextUrl.searchParams.get("q"); if (!q) { return NextResponse.json([]); } const results = await searchPolicies(q); return NextResponse.json(results);}
Expected output: Four API route handlers — upload (POST/GET), document status (GET), compliance summary (GET), and policy search (GET) — all using NextRequest/NextResponse.
Step 11: Set up observability with Langfuse
Langfuse provides tracing for LLM calls. Create src/lib/observability.ts:
ts
import { Langfuse } from "langfuse";export const langfuse = new Langfuse({ publicKey: process.env.LANGFUSE_PUBLIC_KEY, secretKey: process.env.LANGFUSE_SECRET_KEY, baseUrl: process.env.LANGFUSE_HOST,});export async function traceLlamaCall<T>( name: string, callback: () => Promise<T>,): Promise<T> { const gen = langfuse.generation({ name }); try { const result = await callback(); gen.end(); return result; } catch (error) { gen.end(); throw error; }}
Then wire the Langfuse initialization into Next.js instrumentation at src/instrumentation.ts:
ts
export async function register() { if (process.env.NEXT_RUNTIME === "nodejs") { await import("./lib/observability.js"); }}
The register() function runs at server startup when experimental.instrumentationHook: true is set in next.config.ts. The dynamic import() ensures the Langfuse client (which uses Node-only APIs) doesn’t break the Edge runtime. Verify the config at next.config.ts:
Expected output: Langfuse client initialization at server startup, plus a reusable traceLlamaCall helper that wraps any async function with a Langfuse generation trace.
Step 12: Run the tests
The project includes a comprehensive test suite covering every module. Before running tests, install dependencies and set up your environment:
terminal
pnpm installpnpm db:pushpnpm typecheckpnpm lintpnpm test
The test setup at tests/setup.ts uses MSW (Mock Service Worker) to intercept Cohere API calls:
ts
import { setupServer } from "msw/node";import { http, HttpResponse } from "msw";import { afterAll, afterEach, beforeAll } from "vitest";const cohereResponse = { id: "test-response", message: { content: [{ type: "text" as const, text: "Here are the extracted clauses." }], },};export const server = setupServer( http.post("https://api.cohere.com/v2/chat", () => HttpResponse.json(cohereResponse) ),);beforeAll(() => { server.listen({ onUnhandledRequest: "error" }); });afterEach(() => { server.resetHandlers(); });afterAll(() => { server.close(); });
The onUnhandledRequest: "error" option catches any accidental live HTTP calls during tests — if your test accidentally makes a real API request, the test fails immediately rather than silently succeeding against production.
Tests cover every layer. For example, the repair tests verify that repairClauseArray handles clean JSON, markdown code fences, trailing commas, hallucinated fields, and completely invalid input:
ts
import { describe, it, expect } from "vitest";import { repairClauseArray, repairGapArray, repairWithDiagnostics, PolicyClauseSchema } from "../../src/lib/repair.js";import { UnrepairableError } from "@reaatech/structured-repair-core";describe("repair", () => { it("repairClauseArray produces valid typed data from clean JSON input", async () => { const input = JSON.stringify([{ clause_text: "Employees are entitled to 10 sick days per year", clause_type: "sick_leave" as const, compliance_status: "compliant" as const, confidence: 0.95, }]); const result = await repairClauseArray(input); expect(result).toHaveLength(1); expect(result[0].clause_text).toBe("Employees are entitled to 10 sick days per year"); expect(result[0].clause_type).toBe("sick_leave"); expect(result[0].compliance_status).toBe("compliant"); expect(result[0].confidence).toBe(0.95); }); it("repairClauseArray fixes markdown code fences", async () => { const input = '```json\n[{"clause_text":"Test","clause_type":"other","compliance_status":"compliant","confidence":0.8}]\n```'; const result = await repairClauseArray(input); expect(result).toHaveLength(1); expect(result[0].clause_text).toBe("Test"); }); it("repairClauseArray recovers from trailing commas and single-quoted strings", async () => { const input = "[{'clause_text': 'test', 'clause_type': 'other', 'compliance_status': 'compliant', 'confidence': 0.8,}]"; const result = await repairClauseArray(input); expect(result).toHaveLength(1); expect(result[0].clause_text).toBe("test"); }); it("repairClauseArray throws UnrepairableError on completely invalid input", async () => { await expect(repairClauseArray("not json at all")).rejects.toThrow(UnrepairableError); }); it("repairClauseArray strips extra hallucinated fields not in schema", async () => { const input = JSON.stringify([{ clause_text: "Test", clause_type: "other", compliance_status: "compliant", confidence: 0.8, extra_field: "should be removed", }]); const result = await repairClauseArray(input); expect(result).toHaveLength(1); expect("extra_field" in result[0]).toBe(false); }); it("repairClauseArray handles empty array", async () => { const result = await repairClauseArray("[]"); expect(result).toEqual([]); });});
Expected output:pnpm test passes with all 90 tests passing and 0 failures.
Next steps
Add embedding search — Use the existing pgvector embedding column to implement semantic clause search: compute embeddings with Cohere’s embed endpoint, then query via ORDER BY embedding <-> query_vector LIMIT 10.
Build a notification system — When the pipeline identifies high-severity compliance gaps, send email or Slack alerts using the Langfuse webhook integration.
Support more document formats — Extend parseDocument to handle .doc, .txt, and scanned PDFs (add OCR via @reaatech/media-pipeline-mcp-doc-extraction).
({
document_id: documentId,
clause_text: c.clause_text,
clause_type: c.clause_type,
compliance_status: c.compliance_status,
confidence: String(c.confidence),
}));
const result = await db
.insert(policyClauses)
.values(values)
.returning({ id: policyClauses.id });
return result.map((r) => r.id);
}
export async function saveComplianceGaps(
gaps: {
jurisdiction: string;
requirement: string;
current_policy: string;
gap_description: string;
severity: string;
recommended_action: string;
}[],
): Promise<number[]> {
const result = await db
.insert(complianceGaps)
.values(gaps)
.returning({ id: complianceGaps.id });
return result.map((r) => r.id);
}
export async function getComplianceSummary(): Promise<{