SMB teams waste hours digging through Gmail threads and Drive files to find critical information. An AI knowledge agent that indexes Workspace and answers queries with source links would drastically cut search time.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building an AI-powered knowledge search pipeline for Google Workspace that lets you ask questions in plain English and get answers with citations from your Gmail and Drive content. The pipeline ingests emails and documents nightly, generates embeddings with VoyageAI, stores vectors in PGVector, and answers questions via Anthropic’s Claude with streaming, semantic caching, session continuity, and cost telemetry. Quality gates from the REAA evaluation stack catch retrieval regressions before users notice them.
Prerequisites
Node.js 22+ and pnpm 10+
A Google Cloud service account with Gmail readonly and Drive readonly scopes, plus domain-wide delegation configured in your Google Workspace Admin Console
An Anthropic API key (claude-sonnet-4-6 access)
A VoyageAI API key for embeddings
PostgreSQL 16+ with the pgvector extension installed
Langfuse account (optional, for observability)
Basic familiarity with Next.js App Router, TypeScript, and PostgreSQL
Step 1: Create the Project Scaffold
Start by creating the Next.js project and installing all dependencies.
Check your package.json has no ^ or ~ in version strings:
terminal
grep -n '"[~^>]' package.json
This should return nothing. Every version is exact-pinned.
Expected output: Your package.json dependencies section lists every package with an exact version like "0.1.0", not "^0.1.0".
Step 2: Configure Environment Variables
Create an .env.example file that documents every variable the pipeline needs:
env
# Env vars used by anthropic-rag-pipeline-for-google-workspace-smb-email-knowledge-search.# The builder adds entries here as it wires up each integration.# Keep placeholders only — never commit real values.NODE_ENV=developmentANTHROPIC_API_KEY=<your-anthropic-key>VOYAGE_API_KEY=<your-voyage-key>DATABASE_URL=postgres://user:***@host:5432/dbnameGOOGLE_CLIENT_EMAIL=<your-service-account-email>GOOGLE_PRIVATE_KEY=<your-service-account-private-key>LANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>GOOGLE_DELEGATED_USER=<admin@yourdomain.com>NEXT_PUBLIC_APP_URL=http://localhost:3000
Copy it and fill in real values:
terminal
cp .env.example .env.local
Expected output: A .env.local file with your actual API keys and credentials. Never commit this file.
Step 3: Define Shared Domain Types
Create src/types/index.ts with the types used across every module:
Expected output: A clean TypeScript file with no type errors. Run pnpm typecheck to confirm.
Step 4: Create the Environment Configuration
Create src/config/env.ts that validates all environment variables at startup using Zod:
ts
import { z } from "zod";const envSchema = z.object({ ANTHROPIC_API_KEY: z.string().min(1), VOYAGE_API_KEY: z.string().min(1), DATABASE_URL: z.string().min(1), GOOGLE_CLIENT_EMAIL: z.string().min(1), GOOGLE_PRIVATE_KEY: z.string().min(1), LANGFUSE_PUBLIC_KEY: z.string().optional(), LANGFUSE_SECRET_KEY: z.string().optional(), GOOGLE_DELEGATED_USER: z.string().optional().default(""), NEXT_PUBLIC_APP_URL: z.string().optional().default("http://localhost:3000"), NODE_ENV: z.string().optional().default("development"),});export type ParsedEnv = z.infer<typeof envSchema>;let parsedEnv: ParsedEnv | undefined;export function getEnv(): ParsedEnv { if (parsedEnv) { return parsedEnv; } const result = envSchema.safeParse(process.env); if (!result.success) { const missing = result.error.issues .map((i) => i.path.join(".")) .join(", "); throw new Error(`Invalid environment variables: ${missing}`); } parsedEnv = result.data; return parsedEnv;}export const env: ParsedEnv = new Proxy({} as ParsedEnv, { get(_, prop: string) { const e = getEnv(); return (e as Record<string, unknown>)[prop]; },});
The Proxy pattern lets you access env.ANTHROPIC_API_KEY anywhere without calling getEnv() first — it lazy-initializes on first property access.
Expected output:pnpm typecheck passes. If you remove DATABASE_URL from your .env.local, this module throws a clear error at first access.
Step 5: Set Up the Database Connection
Create src/db/connection.ts — a singleton Postgres connection using the postgres.js library with PGVector support:
ts
import postgres from "postgres";import { env } from "../config/env.js";let sql: postgres.Sql | null = null;export function getDb(): postgres.Sql { if (sql) { return sql; } sql = postgres(env.DATABASE_URL, { max: 5, idle_timeout: 20, connect_timeout: 10, }); return sql;}export async function ensureSchema(): Promise<void> { const db = getDb(); await db`CREATE EXTENSION IF NOT EXISTS vector`; await db` CREATE TABLE IF NOT EXISTS chunks ( id bigserial PRIMARY KEY, source_id text NOT NULL, source_type text NOT NULL, chunk_index integer NOT NULL, text text NOT NULL, embedding vector(1536) NOT NULL, metadata jsonb DEFAULT '{}'::jsonb, created_at timestamptz DEFAULT now() ) `; await db`CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks (source_id, source_type)`;}export async function createSearchIndex(): Promise<void> { const db = getDb(); await db`CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING hnsw (embedding vector_cosine_ops)`;}export async function closeDb(): Promise<void> { if (sql) { await sql.end({ timeout: 5 }); sql = null; }}
The chunks table stores document text alongside 1536-dimensional embedding vectors. The HNSW index enables fast approximate nearest-neighbor search using cosine distance.
Expected output:pnpm typecheck passes. The ensureSchema() function creates the vector extension and the chunks table when called against a running Postgres instance.
Step 6: Build the Content Parser
Create src/ingestion/content-parser.ts that routes document content to the right parser based on MIME type:
ts
import mammoth from "mammoth";import { NodeHtmlMarkdown } from "node-html-markdown";import { extractText, getDocumentProxy } from "unpdf";export async function parseDocx(buffer: Buffer): Promise<string> { const result = await mammoth.extractRawText({ buffer }); return result.value;}export function parseHtml(html: string): string { return NodeHtmlMarkdown.translate(html);}export async function parsePdf(buffer: Uint8Array): Promise<string> { const pdf = await getDocumentProxy(buffer); const { text } = await extractText(pdf, { mergePages: true }); return text;}export async function parseContent( content: string | Buffer, mimeType: string,): Promise<string> { switch (mimeType) { case "text/html": case "text/html; charset=utf-8": return parseHtml(typeof content === "string" ? content : content.toString("utf-8")); case "application/pdf": { const data = content instanceof Buffer ? new Uint8Array(content) : new Uint8Array(Buffer.from(content)); return await parsePdf(data); } case "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return await parseDocx( content instanceof Buffer ? content : Buffer.from(content), ); default: return ""; }}
Expected output: HTML becomes clean markdown, DOCX extracts raw text, PDFs get merged-page extraction. Unknown MIME types return empty string — the ingestion loop skips them gracefully.
Step 7: Create the Gmail Ingestion
Create src/ingestion/gmail-sync.ts that fetches recent emails using the Gmail API:
ts
import { google } from "googleapis";import type { gmail_v1 } from "googleapis";import { env } from "../config/env.js";import type { EmailMessage } from "../types/index.js";import { parseContent } from "./content-parser.js";export class IngestionError extends Error { constructor(message: string, public readonly source: string) { super(message) this.name = "IngestionError" }}const auth = new google.auth.JWT({ email: env.GOOGLE_CLIENT_EMAIL, key: env.GOOGLE_PRIVATE_KEY, scopes: ["https://www.googleapis.com/auth/gmail.readonly"], subject: env.GOOGLE_DELEGATED_USER || undefined,});const gmail = google.gmail({ version: "v1", auth });
The full fetchRecentEmails function lists messages from the last 24 hours, fetches each message in full format, extracts headers (subject, from, to, date), decodes the base64 body, and parses HTML bodies to markdown. If the body contains HTML parts, those are converted to clean text via the content parser. Attachments are noted in metadata.
Expected output: The module exports fetchRecentEmails which returns Promise<EmailMessage[]>. Each message has extracted headers and a plain-text body. Empty inbox returns [].
Step 8: Create the Drive Ingestion
Create src/ingestion/drive-sync.ts that fetches recently modified documents from Google Drive:
ts
import { google } from "googleapis";import { env } from "../config/env.js";import type { DriveDocument } from "../types/index.js";import { parseContent } from "./content-parser.js";const auth = new google.auth.JWT({ email: env.GOOGLE_CLIENT_EMAIL, key: env.GOOGLE_PRIVATE_KEY, scopes: ["https://www.googleapis.com/auth/drive.readonly"], subject: env.GOOGLE_DELEGATED_USER || undefined,});const drive = google.drive({ version: "v3", auth });const SUPPORTED_MIME_TYPES = [ "application/vnd.google-apps.document", "application/vnd.google-apps.spreadsheet", "application/vnd.google-apps.presentation", "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", "text/csv",];
The fetchRecentDocuments function lists files modified in the last day matching supported MIME types. Google-native files (Docs, Sheets) are exported as plain text. Binary files (PDF, DOCX) are downloaded via alt: media. All content goes through parseContent.
Expected output: The module exports fetchRecentDocuments returning Promise<DriveDocument[]>. Unsupported file types are skipped gracefully.
Step 9: Build the Ingestion Orchestrator
Create src/ingestion/index.ts that runs both sources in parallel and sends content through the embedding pipeline:
Expected output:runNightlyIngestion() runs both fetchers in parallel with Promise.allSettled. If one source fails, the other still processes. Returns a result object with counts and any errors.
Step 10: Create the VoyageAI Embedder
Create src/rag/embedder.ts that wraps the VoyageAI client:
ts
import { VoyageAIClient } from "voyageai"import { env } from "../config/env.js"let voyageClient: VoyageAIClient | undefinedfunction getClient(): VoyageAIClient { if (voyageClient) return voyageClient voyageClient = new VoyageAIClient({ apiKey: env.VOYAGE_API_KEY }) return voyageClient}export async function generateEmbedding(text: string): Promise<number[]> { const client = getClient() const res = await client.embed({ input: text, model: "voyage-3-lite" }) const item = res.data?.[0] if (!item || !item.embedding) { throw new Error("No embedding returned from VoyageAI") } return item.embedding}export async function generateEmbeddings(texts: string[]): Promise<number[][]> { const client = getClient() const res = await client.embed({ input: texts, model: "voyage-3-lite" }) const embeddings: number[][] = [] for (const item of res.data ?? []) { if (item.embedding && item.index !== undefined) { embeddings[item.index] = item.embedding } } return embeddings}
Expected output:generateEmbedding("text") returns a number[] of 1024 dimensions. generateEmbeddings(["a", "b"]) returns an array of embeddings. Empty input returns an empty array.
Step 11: Build the RAG Pipeline
Create src/rag/pipeline.ts — the heart of the RAG system that chunks text, generates embeddings, stores vectors in PGVector, and searches similar content:
ts
import { getDb } from "../db/connection.js"import { generateEmbedding, generateEmbeddings } from "./embedder.js"import pgvector from 'pgvector'import type { DocumentChunk } from "../types/index.js"export function chunkText(text: string, opts?: { chunkSize?: number; overlap?: number }): string[] { const chunkSize = opts?.chunkSize ?? 1000 const overlap = opts?.overlap ?? 200 const chunks: string[] = [] let start = 0 while (start < text.length) { const end = Math.min(start + chunkSize, text.length) chunks.push(text.slice(start, end)) if (end === text.length) break start = end - overlap } return chunks}export async function embedAndStore( content: string, meta: { sourceId: string; sourceType: string; chunkIndex: number; metadata: Record<string, unknown> }): Promise<void> { const sql = getDb() const chunks = chunkText(content) const embeddings = await generateEmbeddings(chunks) for (let i = 0; i < chunks.length; i++) { const embeddingSql = pgvector.toSql(embeddings[i]) const metaJson: string = JSON.stringify(meta.metadata) await sql` INSERT INTO chunks (source_id, source_type, chunk_index, text, embedding, metadata) VALUES (${meta.sourceId}, ${meta.sourceType}, ${meta.chunkIndex + i}, ${chunks[i]}, ${embeddingSql}::vector, ${metaJson}::jsonb) ` }}export async function searchSimilar(queryText: string, limit?: number): Promise<DocumentChunk[]> { const sql = getDb() const n = limit ?? 10 const embedding = await generateEmbedding(queryText) const embeddingSql = pgvector.toSql(embedding) const rows = await sql` SELECT id, source_id, source_type, text, embedding, metadata, 1 - (embedding <=> ${embeddingSql}::vector) AS relevance FROM chunks ORDER BY embedding <=> ${embeddingSql}::vector LIMIT ${n} ` function toStr(val: unknown): string { if (typeof val === "string") return val; if (typeof val === "number" || typeof val === "boolean") return String(val); return ""; } return rows.map((row: Record<string, unknown>) => ({ id: toStr(row.id), sourceId: toStr(row.source_id), sourceType: toStr(row.source_type) as "email" | "drive", text: toStr(row.text), embedding: Array.isArray(row.embedding) ? (row.embedding as number[]) : [], metadata: typeof row.metadata === "object" && row.metadata !== null ? (row.metadata as Record<string, unknown>) : {}, relevance: typeof row.relevance === "number" ? row.relevance : 0, }))}
The chunkText function splits text into 1000-character chunks with 200-character overlap so no semantic boundary is lost. searchSimilar converts the query to an embedding, then runs cosine distance (<=> operator) against every stored chunk — the closest matches are returned with a relevance score.
Expected output: A document with 2500 characters splits into about 3 chunks. searchSimilar("budget report") returns the most semantically related chunks ranked by cosine similarity.
Step 12: Add Semantic Caching
Create src/rag/cache.ts using the @reaatech/llm-cache package. This avoids re-querying Claude when the same or a semantically similar question is asked:
The cache uses a 0.85 cosine similarity threshold for semantic match detection. When you ask “What’s the Q3 budget?” and later ask “Tell me about Q3 budget”, the cache recognizes the semantic similarity and returns the cached answer instantly — no API call needed. TTLs are tiered: factual answers live for 24 hours, creative answers for 2 hours.
Expected output: Calling checkCache("budget report", "claude-sonnet-4-6") after a storeInCache returns { hit: true, type: "exact" }. An uncached query returns { hit: false }.
Step 13: Implement Session Continuity
Create src/services/session.ts with an in-memory session manager:
ts
import { SessionManager, type IStorageAdapter, type TokenCounter, type Session, type SessionId, type Message, type MessageRole, type MessageContent, type SessionFilters, type MessageQueryOptions, type HealthStatus, type CreateSessionOptions, type CreateMessageOptions,} from '@reaatech/session-continuity'function generateId(): string { return `id-${String(Date.now())}-${Math.random().toString(
Expected output: A new session returns an object with id, status: 'active'. Adding messages increments a sequence number. Querying a nonexistent session throws SessionNotFoundError.
Step 14: Add Cost Telemetry
Create src/services/cost-telemetry.ts to track every LLM call’s cost:
Claude’s pricing for this model is $3.00 per million input tokens and $15.00 per million output tokens. Each call to recordLlmCall produces a CostSpan with the computed dollar cost.
Expected output:recordLlmCall("anthropic", "claude-sonnet-4-6", 500, 100) returns a span with costUsd: 0.003. Multiple calls accumulate in the span array.
Step 15: Configure Langfuse Observability
Create src/instrumentation.ts so Langfuse initializes when the Node.js server starts:
The register() function runs once when the Next.js server boots. The NEXT_RUNTIME guard ensures it only runs in Node.js (not Edge). The dynamic import() of langfuse prevents Edge runtime module-load failures.
Expected output: Starting pnpm dev triggers register(). When env vars are set, Langfuse initializes without errors. When running on Edge, it silently skips.
Step 16: Create the Chat API Route
Create app/api/chat/route.ts — the streaming chat endpoint that ties everything together:
ts
import { type NextRequest, NextResponse } from "next/server";import Anthropic from "@anthropic-ai/sdk";import { env } from "../../../src/config/env.js";import { searchSimilar } from "../../../src/rag/pipeline.js";import { checkCache, storeInCache } from "../../../src/rag/cache.js";import { recordLlmCall } from "../../../src/services/cost-telemetry.js";import { initSessionManager, addTurn } from "../../../src/services/session.js";let anthropicClient: Anthropic | null = null;function getAnthropic(): Anthropic { if (anthropicClient)
The flow: parse the query → check semantic cache → on miss, search PGVector for relevant chunks → build a prompt with those chunks as context → stream Claude’s response as Server-Sent Events → store in cache + session for follow-ups.
Expected output: A POST to /api/chat with {"query": "What was the Q3 revenue?"} returns an SSE stream. Each chunk is data: {"text": "..."} followed by a final data: {"done": true, "citations": [...], "sources": [...]}. A repeat of the same question hits the cache and returns instantly as JSON.
Step 17: Build the Quality Gates
Create src/eval/metrics.ts — the eval sample helpers:
ts
import { EvaluationSampleSchema, type EvaluationSample, type SampleEvalResult } from "@reaatech/rag-eval-core"export function validateSample(raw: unknown): EvaluationSample { return EvaluationSampleSchema.parse(raw)}export function createSample( query: string, context: Array<{id: string; text: string}>, groundTruth: string, generatedAnswer: string): Omit<EvaluationSample, "context"> & { context: Array<{id: string; text: string}> } { return { query, context, ground_truth: groundTruth, generated_answer: generatedAnswer }}export function resolveMetrics(results: SampleEvalResult[]): Record<string, number> { if (results.length === 0) { return { avg_faithfulness: 0, avg_relevance: 0, avg_context_recall: 0, overall_score: 0 } } let totalFaithfulness = 0 let totalRelevance = 0 let totalContextRecall = 0 for (const r of results) { totalFaithfulness += r.faithfulness?.score ?? 0 totalRelevance += r.relevance?.score ?? 0 totalContextRecall += r.context_recall?.score ?? 0 } const n = results.length const avgFaithfulness = totalFaithfulness / n const avgRelevance = totalRelevance / n const avgContextRecall = totalContextRecall / n return { avg_faithfulness: avgFaithfulness, avg_relevance: avgRelevance, avg_context_recall: avgContextRecall, overall_score: (avgFaithfulness + avgRelevance + avgContextRecall) / 3, }}
Now create src/eval/gates.ts — the evaluation engine that runs quality gates:
ts
import { GateEngine, CIIntegration } from "@reaatech/rag-eval-gate"import { CostTracker, CostReporter, Pricing, BudgetManager } from "@reaatech/rag-eval-cost"import { DatasetLoader, DatasetValidator, loadEvalConfig as loadEvalConfigFromDataset } from "@reaatech/rag-eval-dataset"import type { EvalResults, GateConfig, EvaluationSample, EvalSuiteConfig } from "@reaatech/rag-eval-core"export function initGateEngine(): GateEngine { const gates: GateConfig[] = [ { name: "min-faithfulness", type: "threshold", metric: "avg_faithfulness", threshold: 0.85, operator: ">=", },
The gates enforce: faithfulness >= 0.85, relevance >= 0.80, and no regression on overall score compared to baseline. The CIIntegration formatter produces a markdown report and a CI-friendly exit code so you can fail a nightly build when quality drops.
Expected output: Running the eval against results that meet thresholds returns { passed: true }. Dropping faithfulness to 0.75 causes the min-faithfulness gate to fail with a clear message showing what was expected vs what was received.
Step 18: Add the Chat UI
Replace app/page.tsx with a chat interface that consumes the SSE stream:
The full file includes a render function with a message list, colored chat bubbles, source citations rendered as clickable links, and an input bar with a Send button. Build it and run:
terminal
pnpm dev
Open http://localhost:3000 and try asking a question.
Expected output: A clean chat interface. Typing “Show me the budget spreadsheet from last week” streams Claude’s answer token by token, with clickable source citations at the bottom of each response.
Step 19: Create a Chat Flow Integration Test
Create tests/integration/chat-flow.test.ts that verifies the cache, session, and cost modules work together:
Expected output: All tests pass with clean exit code. Three integration tests confirm the cache miss/hit flow, session round-trip, and cost span accumulation all work together.
Next steps
Replace the in-memory adapters with Redis-backed storage for the session manager and cache engine so state survives server restarts
Add a cron job to run runNightlyIngestion() on a schedule using Vercel Cron Jobs or a node-cron scheduler
Expand eval datasets with domain-specific Q&A pairs for your team’s knowledge base and add more quality gates (factual-consistency, answer-relevancy)
Deploy the webhook trigger so the ingestion pipeline runs immediately when a new email arrives or a document is modified, using Google Workspace push notifications
36
).
slice
(
2
,
9
)
}`
}
export class MapSessionAdapter implements IStorageAdapter {
private sessions: Map<string, Session> = new Map()
private messages: Map<string, Message[]> = new Map()