Sales reps and managers waste hours clicking through Pipedrive to find deal statuses and next steps; a conversational assistant would speed up daily stand-ups and pipeline reviews.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
Build a conversational AI agent that knows your Pipedrive CRM pipeline. This tutorial walks you through creating a Next.js-backed knowledge agent that syncs deals, persons, and activities from Pipedrive — then lets you chat with the data using natural language. The assistant retrieves relevant CRM records from Qdrant (powered by Voyage AI embeddings), streams responses from Anthropic Claude, maintains multi-turn conversations, caches common queries, and logs evaluation data to Langfuse.
It is designed for small business owners, sales reps, and developers who want to add a chat interface to their existing Pipedrive setup without manual data entry or complex BI tooling.
Prerequisites
Node.js 22+ and pnpm 10 (pnpm@10.0.0 recommended)
A Pipedrive account with an API token (Settings > API > “Your API token”)
An Anthropic API key (console.anthropic.com)
A Voyage AI API key (voyageai.com)
A Qdrant instance — local via Docker (docker run -p 6333:6333 qdrant/qdrant) or cloud at qdrant.tech
A Langfuse account (optional — tracing works without it if the env var is unset)
Basic familiarity with Next.js App Router and TypeScript
Step 1: Scaffold the project and install dependencies
The project starts from a Next.js 16+ App Router scaffold. Create the directory and set up the project:
Replace every <...> placeholder with real values. The SYNC_INTERVAL_MS controls how often the cron sync pulls fresh data from Pipedrive (5 minutes by default). Langfuse keys are optional — the app gracefully no-ops when they are missing.
Step 3: Create the type definitions
Start with the data types that flow through the system. Create src/types/pipedrive.ts:
Expected output: TypeScript definitions are ready. These interfaces will be imported across every module in the project.
Step 4: Build the Qdrant vector storage adapter
The QdrantMemoryStorage class wraps QdrantClient to implement the full MemoryStorage interface from @reaatech/agent-memory-core. It handles CRUD, batch operations, health checks, and serialization between Qdrant points and Memory objects.
Create src/lib/qdrant-storage.ts with the PIPEDRIVE_MEMORIES_COLLECTION constant and the QdrantMemoryStorage class. The full source includes memoryToPoint and pointToMemory helpers that map Memory fields (tenantId, content, type, confidence, embeddings.vector, timestamps) into Qdrant’s point payload structure and back. Here is the core adapter (helper types and full MemoryStorage interface elided for brevity):
Expected output: A storage adapter that maps between the REAA Memory model and Qdrant’s REST API. The collection uses 1024-dimensional vectors (matching Voyage AI’s voyage-3 model) with cosine distance.
Step 5: Create the Voyage AI embedding provider
This adapter implements EmbeddingProvider from @reaatech/agent-memory-embedding. It wraps the Voyage AI SDK and also creates a cached version.
Create src/lib/voyage-embedder.ts:
ts
import { VoyageAIClient } from "voyageai";import { type EmbeddingProvider, type ModelInfo, CachedEmbeddingProvider, InMemoryEmbeddingCache,} from "@reaatech/agent-memory-embedding";export class VoyageAIEmbeddingProvider implements EmbeddingProvider { private voyageClient: VoyageAIClient; constructor(voyageClient: VoyageAIClient) { this.voyageClient = voyageClient; } async embed(text: string): Promise<number[]> { const response = await this.voyageClient.embed({ input: text, model: "voyage-3", }); const data = response.data as Array<{ embedding: number[] }>; return data[0].embedding; } async embedBatch(texts: string[]): Promise<number[][]> { const response = await this.voyageClient.embed({ input: texts, model: "voyage-3", }); const data = response.data as Array<{ embedding: number[] }>; return data.map((d) => d.embedding); } getModelInfo(): ModelInfo { return { name: "voyage-3", dimensions: 1024, maxInputLength: 16000 }; }}const voyageClient = new VoyageAIClient({ apiKey: process.env.VOYAGE_API_KEY ?? "",});const baseEmbedder = new VoyageAIEmbeddingProvider(voyageClient);export const embedder = new CachedEmbeddingProvider( baseEmbedder, new InMemoryEmbeddingCache({ maxSize: 500, ttlMs: 300000 }),);
Expected output: A reusable embedding provider. The embedder singleton wraps the raw provider with an in-memory cache (500 entries, 5-minute TTL) so repeated calls for the same text avoid API round-trips.
Step 6: Build the session storage adapter
Multi-turn conversations need persistent session state. The MapStorageAdapter implements IStorageAdapter from @reaatech/session-continuity using in-memory Maps. It supports CRUD on sessions and messages, optimistic concurrency via version numbers, and expiry. The full source includes listSessions, updateMessage, deleteMessage, and deleteAllMessages — here is the core:
Create src/lib/session-storage.ts:
ts
import { type IStorageAdapter, type Session, type SessionId, type Message, type MessageId, type MessageQueryOptions, type SessionFilters, type UpdateSessionOptions, ConcurrencyError,} from "@reaatech/session-continuity";let sequenceCounter = 0;function nextSequence(): number { return ++sequenceCounter; }export class MapStorageAdapter implements IStorageAdapter { private sessions: Map<string, { session: Session }>
Expected output: An in-memory session store that supports all core IStorageAdapter methods, including concurrent update detection, multi-field message filtering, and expiry management.
Step 7: Wire up the Pipedrive sync engine
The PipedriveSync class fetches all deals, persons, and activities from Pipedrive using cursor-based pagination with retry, chunks each record into text fragments, embeds them via Voyage AI, and upserts the vectorized memories into Qdrant. Each fetch is wrapped in try/catch for partial-failure tolerance.
Create src/lib/sync.ts:
ts
import { Configuration, DealsApi, PersonsApi, ActivitiesApi } from "pipedrive/v2";import { withRetry, Memory, MemoryType, MemorySource, MemoryImportance, MemoryLifecycle } from "@reaatech/agent-memory-core";import type { PipedriveDeal, PipedrivePerson, PipedriveActivity, SyncResult } from "../types/pipedrive";import { VoyageAIEmbeddingProvider } from "./voyage-embedder";import { QdrantMemoryStorage } from "./qdrant-storage";function toStr(val: unknown): string { if (val === null || val === undefined) return ""; if (typeof val === "string") return val;
Expected output: A sync engine that paginates through all three Pipedrive endpoints with retry and partial-failure tolerance, converts records to chunked text, and indexes them as vectorized Memory objects in Qdrant. Each deal produces up to 3 chunks (overview, timeline, contacts), each person 1, and each activity 1.
Step 8: Build the service layer
The service layer wires the adapters together. Create these files:
src/services/session.ts — manages multi-turn conversations via SessionManager:
ts
import { SessionManager, type IStorageAdapter, type TokenCounter, type Message, type Session, type ConversationContextResult, SessionNotFoundError, TokenBudgetExceededError,} from "@reaatech/session-continuity";import { MapStorageAdapter } from "../lib/session-storage";class CharTokenCounter implements TokenCounter { readonly model = "char-based"; readonly tokenizer = "char-4"; count(text: string): number { return Math.ceil(text.length / 4); } countMessages(messages: Message[]): number { let total = 0; for (const msg of messages) { if (typeof msg.content === "string") total += this.count(msg.content); } return total; }}const storage: IStorageAdapter = new MapStorageAdapter();const tokenCounter = new CharTokenCounter();const sessionManager = new SessionManager({ storage, tokenCounter, tokenBudget: { maxTokens: 4096, reserveTokens: 500, overflowStrategy: "compress" }, compression: { strategy: "sliding_window", targetTokens: 3500 },});export async function createSession(userId?: string): Promise<Session> { return sessionManager.createSession({ userId });}export async function getSession(id: string): Promise<Session> { return sessionManager.getSession(id);}export async function addMessage(sessionId: string, message: { role: "user" | "assistant"; content: string }): Promise<Message> { return sessionManager.addMessage(sessionId, { role: message.role, content: message.content });}export async function getConversationContext(sessionId: string): Promise<Message[]> { return sessionManager.getConversationContext(sessionId);}export async function endSession(id: string): Promise<void> { return sessionManager.endSession(id);}export { sessionManager, CharTokenCounter, SessionNotFoundError, TokenBudgetExceededError };
src/services/retrieval.ts — retrieves relevant Pipedrive memories from Qdrant and formats them for the LLM prompt:
ts
import { MemoryRetriever, ContextInjector, RetrievalStrategy } from "@reaatech/agent-memory-retrieval";import { Memory, MemoryType } from "@reaatech/agent-memory-core";import type { EmbeddingProvider } from "@reaatech/agent-memory-embedding";import { QdrantMemoryStorage } from "../lib/qdrant-storage";import { embedder } from "../lib/voyage-embedder";import { QdrantClient } from "@qdrant/js-client-rest";const qdrantClient = new QdrantClient({ url: process.env.QDRANT_URL ?? "http://localhost:6333" });const qdrantStorage = new QdrantMemoryStorage(qdrantClient);const retriever = new MemoryRetriever(qdrantStorage, embedder, { defaultLimit: 5, useCrossEncoder: false, diversityFactor: 0.3, strategies: [RetrievalStrategy.SEMANTIC, RetrievalStrategy.RECENCY],});const injector = new ContextInjector(100000, 4);export async function searchRelevantMemories(query: string, tenantId: string, limit?: number): Promise<Memory[]> { return retriever.retrieve(query, { tenantId, limit, filters: { types: [MemoryType.FACT] } });}export async function formatContextForPrompt( memories: Memory[], conversationTurns: Array<{ speaker: string; content: string; timestamp: Date }>, tokenBudget: number,): Promise<string> { const turns = conversationTurns.map((t) => ({ speaker: t.speaker as "user" | "agent", content: t.content, timestamp: t.timestamp, })); return injector.injectMemoriesIntoContext(turns, memories, tokenBudget);}export { retriever, injector };
src/services/anthropic.ts — calls the Anthropic API with Claude Sonnet 4, including streaming and a cache-integrated generation helper:
Expected output: Five service modules that together handle session management, vector retrieval, Anthropic streaming (with caching), LLM response caching, and RAG evaluation logging. The embedder from the Voyage adapter is reused across both the retrieval and cache services.
Step 9: Create the Next.js API routes
Chat endpoint
app/api/chat/route.ts handles streaming chat completions with SSE. It validates input with Zod, retrieves or creates a session, checks the cache, searches relevant memories, streams the Anthropic response, and logs everything.
ts
import { NextRequest, NextResponse } from "next/server";import { z } from "zod";import { createSession, getSession, getConversationContext, addMessage, SessionNotFoundError,} from "../../../src/services/session.js";import { searchRelevantMemories, formatContextForPrompt } from "../../../src/services/retrieval.js";import { streamChatResponse } from "../../../src/services/anthropic.js";import { getCachedResponse, storeResponse } from "../../../src/services/cache.js";import { logToLangfuse } from "../../../src/services/eval.js";const chatSchema = z.object({ message: z.string().min(1), sessionId: z.
Sync endpoint
app/api/sync/route.ts triggers a Pipedrive data sync:
ts
import { NextRequest, NextResponse } from "next/server";import { Configuration } from "pipedrive/v2";import { PipedriveSync } from "../../../src/lib/sync";import { VoyageAIEmbeddingProvider } from "../../../src/lib/voyage-embedder";import { VoyageAIClient } from "voyageai";import { QdrantMemoryStorage } from "../../../src/lib/qdrant-storage";import { QdrantClient } from "@qdrant/js-client-rest";export async function POST(req: NextRequest): Promise<NextResponse> { try { let fullResync = false; try { const body = (await req.json()) as { fullResync?: boolean }; fullResync = body.fullResync === true; } catch { /* no body */ } const apiConfig = new Configuration({ apiKey: process.env.PIPEDRIVE_API_TOKEN ?? "" }); const qdrantClient = new QdrantClient({ url: process.env.QDRANT_URL ?? "http://localhost:6333" }); const storage = new QdrantMemoryStorage(qdrantClient); const voyageClient = new VoyageAIClient({ apiKey: process.env.VOYAGE_API_KEY ?? "" }); const voyageEmbedder = new VoyageAIEmbeddingProvider(voyageClient); const syncEngine = new PipedriveSync(apiConfig, voyageEmbedder, storage); const result = await syncEngine.sync(); return NextResponse.json({ ok: true, result, fullResync }); } catch (error) { const msg = error instanceof Error ? error.message : "Internal server error"; if (msg.includes("403") || msg.includes("401") || msg.includes("Pipedrive")) return NextResponse.json({ ok: false, error: msg }, { status: 502 }); return NextResponse.json({ ok: false, error: msg }, { status: 500 }); }}
Expected output: Two API route handlers — /api/chat for streaming conversational queries and /api/sync for triggering data syncs. Both use NextRequest/NextResponse with proper error handling and content-type headers.
Step 10: Set up the instrumentation for cron sync
The src/instrumentation.ts file hooks into Next.js’s server startup to run a periodic Pipedrive sync. This requires experimental.instrumentationHook: true in next.config.ts.
The NEXT_RUNTIME === "nodejs" guard ensures this only runs on the server, not in the Edge runtime. All imports are dynamic to avoid loading Node-only modules at Edge compile time.
Expected output: Every 5 minutes (or whatever SYNC_INTERVAL_MS is set to), the server automatically pulls the latest Pipedrive data and reindexes it into Qdrant. A sync also runs immediately on server start.
Step 11: Update the landing page and layout
Create app/page.tsx with a minimal landing page:
tsx
export default function Home() { return ( <main style={{ maxWidth: 720, margin: "40px auto", padding: "0 20px", fontFamily: "system-ui" }}> <h1>Pipedrive Sales Assistant</h1> <p>Chat with your Pipedrive pipeline using natural language.</p> <h2>API Endpoints</h2> <ul> <li><code>POST /api/chat</code> — Send a message <code>{"{ message, sessionId?, userId? }"}</code></li> <li><code>POST /api/sync</code> — Trigger Pipedrive data sync <code>{"{ fullResync?: boolean }"}</code></li> </ul> </main> );}
Update app/layout.tsx to set the page metadata and configure Geist fonts:
tsx
import type { Metadata } from "next";import { Geist, Geist_Mono } from "next/font/google";import "./globals.css";const geistSans = Geist({ variable: "--font-geist-sans", subsets: ["latin"] });const geistMono = Geist_Mono({ variable: "--font-geist-mono", subsets: ["latin"] });export const metadata: Metadata = { title: "Pipedrive Knowledge Agent", description: "Chat with your Pipedrive pipeline using natural language — deal summaries, follow-up reminders, and win probability.",};export default function RootLayout({ children }: Readonly<{ children: React.ReactNode }>) { return ( <html lang="en" className={`${geistSans.variable} ${geistMono.variable}`}> <body>{children}</body> </html> );}
Expected output: A clean landing page that documents the two API endpoints, and a root layout that sets page title, description, and Geist font variables.
Step 12: Run the tests and verify
The project includes 213 tests across 92 test suites covering every adapter, service, and API route. Run them with:
terminal
pnpm test
This runs vitest run --coverage --reporter=json --outputFile=vitest-report.json. Your output should look similar to:
Expected output: All 213 tests pass, TypeScript compiles clean, and ESLint reports zero errors. The next.config.ts uses experimental.instrumentationHook: true (required because src/instrumentation.ts exists), and all route handlers use NextRequest/NextResponse (no bare Request or new Response(JSON.stringify(...))).
Next steps
Add a web UI — Build a chat frontend that consumes the SSE stream from /api/chat and renders deal cards when the agent returns CRM data. A React component using EventSource or the Fetch API’s ReadableStream is a natural next step.
Extend to calendar events — Modify the sync engine to also pull Pipedrive calendar activities and filter meetings by date range. This lets you ask “What’s on my calendar this week?”
Add user authentication — Replace the userId || "anonymous" fallback with a proper auth middleware (NextAuth, Clerk, or custom JWT) so each user gets their own isolated Pipedrive context and session history.
Deploy to production — Swap the in-memory session store for PostgreSQL (via @reaatech/session-continuity-pg), replace the in-memory cache with Redis, and point Qdrant at a managed cluster. Add rate limiting with a token-bucket middleware per user.
=
new
Map
();
private messages: Map<string, Map<string, Message>> = new Map();
chunks.push(`Deal Timeline: expected close ${deal.expectedCloseDate}, last activity ${deal.lastActivityDate}, next activity ${deal.nextActivityDate}`);
chunks.push(`Deal Contacts: person "${deal.personName}", owner "${deal.ownerName}"`);
const systemPrompt = `You are a knowledgeable Pipedrive sales assistant. Your role is to help sales reps and managers understand their pipeline using natural language.\n\nUse the following retrieved context to answer questions about deals, persons, and activities:\n\n${formattedContext}\n\nAnswer concisely and accurately based on the available data. If you don't have enough information, say so clearly.`;