A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial builds a conversational knowledge agent powered by Cohere that answers financial questions by querying a PostgreSQL-backed knowledge base with semantic caching. The agent understands natural language questions about transactions, expenses, and cash flow — letting small business owners ask “How much did I spend on office supplies last month?” and get an answer from their financial data without writing SQL.
You’ll build it inside a Next.js project shell, with an Express API server backed by pgvector for vector search, fastembed for embeddings, and several REAA packages that handle multi-turn memory, confidence-based routing, semantic caching, and structured output repair. A simple Next.js chat UI at app/page.tsx ships with the scaffold so you can try the API right away.
Prerequisites
Node.js 22+ and pnpm 10+
A Cohere API key (from the Cohere dashboard)
A PostgreSQL instance with the pgvector extension enabled
Langfuse account (optional, for observability)
Familiarity with TypeScript and Express middleware patterns
Step 1: Scaffold the project and install dependencies
The project starts from a Next.js 16 scaffold with the App Router. Create the project structure and install all dependencies:
terminal
mkdir cohere-financial-agent && cd cohere-financial-agent# Initialize package.jsonpnpm init
Open package.json and set it to type module, then add the scripts and dependencies:
Expected output: pnpm reports that all packages are installed and linked. You’ll see node_modules/, pnpm-lock.yaml, and a next.config.ts created.
The next, react, and react-dom packages come with the Next.js scaffold — next dev starts the Next.js development server that serves a chat UI from app/page.tsx. The Express API server you’ll build in the following steps runs separately on port 3001.
Step 2: Configure environment variables
Create .env in the project root. The agent needs a Cohere API key, a PostgreSQL connection string, and optional Langfuse credentials for observability:
Expected output: Two files at the project root with the keys listed above.
Step 3: Build the configuration module
Create src/lib/config.ts to validate all environment variables at startup using Zod. This catches misconfiguration early instead of failing mid-request:
The financial_transactions table holds transaction data with a 384-dimensional vector column for embeddings. The conversation_history table stores multi-turn chat context per session.
Expected output: A database module that exports searchSimilarTransactions, storeConversation, and getConversationHistory — each wrapped in withRetry for resilience.
Step 5: Create the embedding service
Create src/services/embedding.ts to generate vector embeddings using fastembed’s BGE model:
ts
import { EmbeddingModel, FlagEmbedding } from "fastembed";let model: Awaited<ReturnType<typeof FlagEmbedding.init>> | null = null;async function getModel() { if (model) return model; try { model = await FlagEmbedding.init({ model: EmbeddingModel.BGEBaseEN }); return model; } catch (cause) { throw new Error( `Failed to initialize embedding model: ${cause instanceof Error ? cause.message : String(cause)}`, ); }}export async function embedQuery(query: string): Promise<number[]> { const m = await getModel(); const result = await m.queryEmbed(query); return Array.from(result);}export async function embedDocuments( docs: string[], batchSize?: number,): Promise<AsyncGenerator<number[][]>> { const m = await getModel(); return m.passageEmbed(docs, batchSize);}export async function embedBatch( texts: string[], batchSize?: number,): Promise<number[][]> { const m = await getModel(); const generator = m.passageEmbed(texts, batchSize); const results: number[][] = []; for await (const batch of generator) { results.push(...batch); } return results;}
Embedding converts a user’s natural language question into a vector that pgvector uses to find similar transactions via L2 distance. The BGE model outputs 384-dimensional vectors.
Expected output: Three exports — embedQuery for single queries, embedDocuments for async batch embedding, and embedBatch for batch collection.
Step 6: Add multi-turn conversation memory
Create src/services/memory.ts to manage conversation context using the @reaatech/agent-memory-core package:
The CacheEngine wraps dual storage (exact-match in memory, semantic-match via embeddings). Questions that are semantically similar — like “What did I spend on food?” and “Show my grocery costs” — hit the cache without calling Cohere.
Expected output:cacheGet, cacheSet, and cacheInvalidate functions backed by the REAA llm-cache engine with fastembed for vector comparison.
Step 9: Connect the Cohere API
Create src/services/cohere.ts to call Cohere’s chat endpoint:
ts
import { CohereClientV2, CohereError, CohereTimeoutError } from "cohere-ai";const cohere = new CohereClientV2({});function extractContent( content: string | { type?: string; text?: string }[] | null | undefined,): string { if (typeof content === "string") return content; if (Array.isArray(content) && content.length > 0) { const first = content[0]; return first.text ?? ""; } return "";}export async function generateAnswer( context: string, question: string,): Promise<string> { try { const response = await cohere.chat({ model: "command-a-03-2025", messages: [ { role: "system", content: `You are a financial analytics assistant. Use the following context to answer the user's question accurately.Context:${context}`, }, { role: "user", content: question, }, ], }); const message = response.message; const msgContent = message.content; return extractContent(msgContent); } catch (cause) { if (cause instanceof CohereTimeoutError) { throw new Error(`Cohere request timed out: ${cause.message}`); } if (cause instanceof CohereError) { throw new Error( `Cohere API error (status ${String(cause.statusCode ?? "unknown")}): ${cause.message}`, ); } throw cause; }}export async function* generateAnswerStream( context: string, question: string,): AsyncGenerator<string, void, undefined> { let stream; try { stream = await cohere.chatStream({ model: "command-a-03-2025", messages: [ { role: "system", content: `You are a financial analytics assistant. Use the following context to answer the user's question accurately.Context:${context}`, }, { role: "user", content: question, }, ], }); } catch (cause) { if (cause instanceof CohereTimeoutError) { throw new Error(`Cohere request timed out: ${cause.message}`); } if (cause instanceof CohereError) { throw new Error( `Cohere API error (status ${String(cause.statusCode ?? "unknown")}): ${cause.message}`, ); } throw cause; } for await (const chatEvent of stream) { if (chatEvent.type === "content-delta") { const delta = chatEvent.delta?.message; if (typeof delta === "string") { yield delta; } } }}
The generateAnswer function passes the conversation history and relevant transactions as context, then extracts the text from Cohere’s response. The generateAnswerStream variant yields streaming content deltas.
Expected output: Two functions — generateAnswer for synchronous responses and generateAnswerStream for streaming.
Step 10: Add structured output repair
Create src/services/repair.ts to fix malformed LLM responses:
ts
import { repair, repairOutput, isValid, analyzeInput, UnrepairableError, type RepairResult,} from "@reaatech/structured-repair-core";import { z } from "zod";const AnswerSchema = z.object({ answer: z.string(), confidence: z.number().min(0).max(1).optional(), sources: z.array(z.string()).optional(),});export type AnswerType = z.infer<typeof AnswerSchema>;const FALLBACK_ANSWER: AnswerType = { answer: "I couldn't process the response. Please try again.", confidence: 0,};export async function repairAnswer(rawOutput: string): Promise<AnswerType> { try { return await repair(AnswerSchema, rawOutput); } catch (cause) { if (cause instanceof UnrepairableError) { return FALLBACK_ANSWER; } throw cause; }}export function validateAnswer(data: unknown): boolean { const strData = typeof data === "string" ? data : JSON.stringify(data); return isValid(AnswerSchema, strData);}export function repairWithDiagnostics( rawOutput: string,): RepairResult<AnswerType> { try { const result: RepairResult<AnswerType> = repairOutput({ schema: AnswerSchema, input: rawOutput, debug: true, }); return result; } catch (cause) { if (cause instanceof UnrepairableError) { const result: RepairResult<AnswerType> = { success: false, data: FALLBACK_ANSWER, originalInput: rawOutput, steps: [], errors: [{ message: cause.message, code: "UNREPAIRABLE" }], }; return result; } throw cause; }}export function analyzeRawOutput(rawOutput: string) { return analyzeInput(rawOutput);}
The @reaatech/structured-repair-core package attempts to fix common LLM output problems — code fences around JSON, truncated responses, extra whitespace, type coercion, and hallucinated fields. If all strategies fail, a safe fallback answer is returned instead of crashing.
Expected output: A repair service that always returns a valid AnswerType object (with answer, optional confidence, and optional sources), even from garbage input.
Step 11: Add Langfuse observability
Create src/services/langfuse.ts to trace generations, cache hits, embeddings, and clarifications through Langfuse. The service silently no-ops when Langfuse credentials are absent, so observability is optional:
The getLangfuse function lazily initializes the client only when both LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set. Every exported trace function checks for a null client before doing work, so the service degrades gracefully without credentials.
Expected output: Four trace functions that log generation, embedding, cache-hit, and clarification events to Langfuse — or silently skip when Langfuse is unconfigured.
Step 12: Create the chat API route
Create src/api/chat/route.ts — the Express route that orchestrates the full pipeline:
ts
import { Router, type Request, type Response } from "express";import { z } from "zod";import type { Prediction } from "@reaatech/confidence-router-core";import { CohereError, CohereTimeoutError } from "cohere-ai";import { getLogger } from "@reaatech/agent-memory-core";import * as memory from "../../services/memory.js";import * as embedding from "../../services/embedding.js";import * as db from "../../services/database.js";import * as confidence from "../../services/confidence.js";
The route handler:
Validates the request body with Zod
Loads conversation history for the session
Classifies the query using financial keywords
Routes through the confidence engine — returns clarification or fallback immediately for non-financial queries
Embeds the query and searches pgvector for semantically similar transactions
Checks the semantic cache — returns cached answers without calling Cohere
Calls Cohere with conversation history and relevant transactions as context
Repairs the LLM’s output into a structured AnswerType
Stores the result in the cache and in conversation history
Traces everything through Langfuse
Expected output: An Express router mounted at /api/chat that accepts POST requests with sessionId and message.
Step 13: Wire the Express server entry point
Create src/index.ts as the application entry point. The Express API runs on port 3001 alongside the Next.js dev server (which serves the chat UI at app/page.tsx):
ts
import "dotenv/config";import express, { type Request, type Response, type NextFunction } from "express";import { getLogger } from "@reaatech/agent-memory-core";import { config } from "./lib/config.js";import chatRouter from "./api/chat/route.js";import { pool } from "./services/database.js";const app = express();const logger = getLogger();app.use(express.json());app.use((req: Request, res: Response, next: NextFunction) => { const start = Date.now(); res.on("finish", () => { const duration = Date.now() - start; logger.info(`${req.method} ${req.originalUrl} ${String(res.statusCode)} ${String(duration)}ms`); }); next();});app.use("/api/chat", chatRouter);app.get("/health", (_req: Request, res: Response) => { res.json({ status: "ok" });});app.use((err: Error, _req: Request, res: Response, _next: NextFunction): void => { logger.error("Unhandled error", { error: err.message }); res.status(500).json({ error: "Internal server error" }); void _next;});const server = app.listen(config.PORT, () => { logger.info(`Server listening on port ${String(config.PORT)}`);});function gracefulShutdown(signal: string): void { logger.info(`Received ${signal}, shutting down gracefully...`); server.close(() => { void pool.end().then(() => { logger.info("Server closed and pool ended"); process.exit(0); }); });}process.on("SIGTERM", () => { gracefulShutdown("SIGTERM"); });process.on("SIGINT", () => { gracefulShutdown("SIGINT"); });export { app };export const SCAFFOLD_VERSION = "0.1.0" as const;
The server includes:
JSON body parsing
Request logging middleware with timing
The chat route at /api/chat
A health check endpoint at /health
Global error handler
Graceful shutdown that drains the database pool
Start the Express server:
terminal
node --import tsx src/index.ts
In a separate terminal, start the Next.js dev server for the chat UI:
terminal
pnpm dev
Expected output: An Express server on port 3001 responding to GET /health with {"status":"ok"}, and a Next.js dev server on port 3000 with a chat interface.
Step 14: Run the tests
The project includes a comprehensive test suite with unit tests for every service and integration tests for the chat route. Run:
terminal
pnpm test
Expected output: Vitest runs the test suite and reports all tests passing with 90%+ code coverage across lines, branches, functions, and statements. The test suite covers:
The database service — search, insert, conversation round-trips
The embedding service — query and batch embedding
The memory service — addTurn, getSessionContext, clearSession
The confidence router — ROUTE, CLARIFY, FALLBACK and error paths
The cache service — set, get hit/miss, invalidate
The Cohere client — generateAnswer with MSW-mocked HTTP responses
The repair service — valid JSON, code-fence wrappers, fallback on unrepairable output
The Langfuse tracing service — generation, embedding, cache hit, clarification traces
The chat route — full pipeline, validation errors, timeouts, cache hits
The Express server — health check, graceful shutdown
Next steps
Replace InMemoryAdapter with Redis or PostgreSQL for the cache so semantic cache survives restarts. The @reaatech/llm-cache package supports pluggable adapters.
Add a streaming endpoint using generateAnswerStream from the Cohere service to stream responses token-by-token to a chat UI.
Replace keyword-based classification with a zero-shot classifier or a fine-tuned model for more accurate financial intent detection.
Deploy with pgvector on a managed PostgreSQL provider (Neon, Supabase, or AWS RDS with the pgvector extension) for production use.
import * as cache from "../../services/cache.js";
import * as cohere from "../../services/cohere.js";
import * as repair from "../../services/repair.js";
import * as langfuse from "../../services/langfuse.js";