Skip to content
/ solutions / cohere-knowledge-agent-for-plaid-smb-financial-insights Cohere Knowledge Agent for Plaid SMB Financial Insights Answer questions about your small business finances by connecting your Plaid-linked bank accounts to a natural language Q&A agent powered by Cohere.
The problem SMB owners waste hours manually combing through bank statements and spreadsheets to answer basic questions about cash flow, vendor spending, and recurring costs.
Example artifact A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
190 kB · 135 tests· 98.4% coverage· vitest passing
SHA-256 2902bf04308b27d984ffbc704ec5c116dec231620fbfa09235c29bf699dcf7f9 Comments Sign in to commentSign in with GitHub to comment and vote.
© 2026 REAA Technologies Inc. — Open-Source AI Solutions for Small Business.
On this page Intro
This tutorial builds a natural language Q&A agent for small business financial data. You’ll connect Plaid-linked bank accounts, index transactions into a Qdrant vector store, and answer questions like “What was my cash flow last month?” or “How much did I spend on vendors?” — all through a Cohere-powered RAG pipeline. The stack uses Next.js 16 for the API layer, fastembed for embedding, and four REAA packages for routing, session management, output repair, and hybrid retrieval.
Prerequisites
Node.js >= 22 and pnpm 10 (or npm)
A Plaid sandbox account — sign up at plaid.com and get your client_id, secret, and a sandbox access_token
A Cohere API key — sign up at cohere.com
A Qdrant instance — run locally with docker run -p 6333:6333 qdrant/qdrant or use Qdrant Cloud
A Langfuse account for observability (optional — the app runs without it, but telemetry is helpful)
Familiarity with TypeScript, Next.js App Router route handlers, and basic vector search concepts
Step 1: Scaffold the project
Start from an empty directory and install the dependencies.
cd cohere-plaid-agent
pnpm init
Now create package.json with the exact pinned dependencies:
{
"name" : "cohere-knowledge-agent-for-plaid-smb-financial-insights" ,
"version" : "0.1.0" ,
"private" : true ,
"scripts" : {
"dev" : "next dev" ,
"build" : "next build" ,
"start" : "next start" ,
"lint" : "eslint ." ,
"typecheck" : "tsc --noEmit" ,
"test" : "vitest run --coverage --reporter=json --outputFile=vitest-report.json"
},
"dependencies" : {
"@qdrant/js-client-rest" : "1.18.0" ,
"@reaatech/confidence-router" : "0.1.1" ,
"@reaatech/hybrid-rag" : "0.1.0" ,
"@reaatech/session-continuity" : "0.1.0" ,
"@reaatech/structured-repair-core" : "1.0.0" ,
"cohere-ai" : "8.0.0" ,
"fastembed" : "2.1.0" ,
"langfuse" : "3.38.20" ,
"next" : "16.2.7" ,
"pdf-parse" : "2.4.5" ,
"plaid" : "42.2.0" ,
"react" : "19.2.4" ,
"react-dom" : "19.2.4" ,
"xlsx" : "0.18.5" ,
"zod" : "4.4.3"
},
"devDependencies" : {
"@types/node" : "20.19.42" ,
"@types/react" : "19.2.17" ,
"@types/react-dom" : "19.2.3" ,
"@vitest/coverage-v8" : "4.1.8" ,
"eslint" : "9.39.4" ,
"eslint-config-next" : "16.2.7" ,
"msw" : "2.14.6" ,
"typescript" : "5.9.3" ,
"typescript-eslint" : "8.60.1" ,
"vitest" : "4.1.8"
},
"type" : "module" ,
"engines" : { "node" : ">=22" },
"packageManager" : "pnpm@10.0.0" ,
"license" : "MIT"
} Expected output: All 24 packages installed with exact versions. No ^ or ~ anywhere in your lockfile.
Step 2: Add root config files Create tsconfig.json with strict TypeScript settings:
{
"compilerOptions" : {
"target" : "ES2017" ,
"lib" : [ "dom" , "dom.iterable" , "esnext" ],
"allowJs" : true ,
"skipLibCheck" : true ,
"strict" : true ,
"noEmit" : true ,
"esModuleInterop" : true ,
"module" : "esnext" ,
"moduleResolution" : "bundler" ,
"resolveJsonModule" : true ,
"isolatedModules" : true ,
"jsx" : "react-jsx" ,
"incremental" : true ,
"plugins" : [
{
"name" : "next"
}
],
"paths" : {
"@/*" : [ "./*" ]
}
},
"include" : [
"next-env.d.ts" ,
"**/*.ts" ,
"**/*.tsx" ,
".next/types/**/*.ts" ,
".next/dev/types/**/*.ts" ,
"**/*.mts"
],
"exclude" : [ "node_modules" ]
} Create next.config.ts — note the experimental.instrumentationHook flag. This is required when you have src/instrumentation.ts so the register() function actually fires at startup.
import type { NextConfig } from "next" ;
const nextConfig : NextConfig = {
experimental: {
instrumentationHook: true ,
} as NextConfig [ "experimental" ],
};
export default nextConfig; import { defineConfig } from "vitest/config" ;
export default defineConfig ({
test: {
globals: true ,
environment: "node" ,
pool: "threads" ,
coverage: {
provider: "v8" ,
reporter: [ "text" , "json" , "json-summary" ],
reportsDirectory: "./coverage" ,
include: [ "src/**/*.ts" , "app/**/route.ts" ],
exclude: [
"node_modules/**" ,
"dist/**" ,
"coverage/**" ,
"**/*.config.{ts,mjs,js}" ,
"**/*.d.ts" ,
"**/*.test.ts" ,
"**/*.test.tsx" ,
"**/*.tsx" ,
"app/**/layout.ts" ,
"app/**/error.ts" ,
"app/**/loading.ts" ,
"app/**/not-found.ts" ,
],
thresholds: {
lines: 90 ,
branches: 90 ,
functions: 90 ,
statements: 90 ,
},
},
},
}); # Env vars used by cohere-knowledge-agent-for-plaid-smb-financial-insights.
# The builder adds entries here as it wires up each integration.
# Keep placeholders only — never commit real values.
NODE_ENV=development
# Plaid
PLAID_CLIENT_ID=<your-plaid-client-id>
PLAID_SECRET=<your-plaid-secret>
PLAID_ENVIRONMENT=sandbox
PLAID_ACCESS_TOKEN=<plaid-access-token>
# Cohere
COHERE_API_KEY=<your-cohere-api-key>
# Qdrant
QDRANT_URL=http://localhost:6333
QDRANT_API_KEY=<optional>
# Langfuse
LANGFUSE_PUBLIC_KEY=<pk>
LANGFUSE_SECRET_KEY=<sk>
LANGFUSE_HOST=https://cloud.langfuse.com Create the App Router shell — app/layout.tsx:
import type { Metadata } from "next" ;
import { Geist, Geist_Mono } from "next/font/google" ;
import "./globals.css" ;
const geistSans = Geist ({
variable: "--font-geist-sans" ,
subsets: [ "latin" ],
});
const geistMono = Geist_Mono ({
variable: "--font-geist-mono" ,
subsets: [ "latin" ],
});
export const metadata : Metadata = {
title: "Cohere Knowledge Agent — Plaid SMB Financial Insights" ,
description: "Answer SMB financial questions by connecting Plaid bank accounts to a Cohere-powered natural language Q&A agent" ,
};
export default function RootLayout ({
children,
} : Readonly <{
children : React . ReactNode ;
}>) {
return (
<html lang = "en" className ={ `${ geistSans . variable } ${ geistMono . variable }` }>
<body>{ children }</body>
</html>
);
} Expected output: All config files created. Run pnpm typecheck — it should pass with zero errors on the scaffold.
Step 3: Define the type system Create the domain types that every service layer will reference. Start with src/types/plaid.ts:
export interface PlaidTransaction {
id : string ;
accountId : string ;
amount : number ;
date : string ;
name : string ;
category : string [];
pending : boolean ;
merchantName : string | null ;
}
export interface PlaidAccount {
id : string ;
name : string ;
type : string ;
subtype : string | null ;
balances : { current : number | null ; available : number | null };
}
export interface PlaidErrorDetail {
errorType : string ;
errorCode : string ;
errorMessage : string ;
} Create src/types/cohere.ts:
export interface CohereSource {
name : string ;
amount ?: number ;
date ?: string ;
}
export interface CohereAnswer {
answer : string ;
sources : CohereSource [];
confidence : number ;
}
export interface CohereQueryInput {
query : string ;
context : string [];
}
export interface CohereErrorDetail {
statusCode : number ;
message : string ;
body : unknown ;
} Create src/types/finance.ts:
export interface FinancialQuery {
type : "cash_flow" | "vendor_spending" | "recurring_costs" | "balance" | "transaction_history" | "general" ;
filters ?: Record < string , unknown >;
dateRange ?: { start : string ; end : string };
}
export interface SpendingSummary {
totalIn : number ;
totalOut : number ;
byCategory : Record < string , number >;
byVendor : Record < string , number >;
period : string ;
}
export interface TransactionChunk {
id : string ;
docId : string ;
content : string ;
source : string ;
} Create src/types/config.ts:
export interface PlaidConfig {
clientId : string ;
secret : string ;
env : string ;
accessToken : string ;
}
export interface CohereConfig {
apiKey : string ;
}
export interface QdrantConfig {
url : string ;
apiKey ?: string ;
}
export interface LangfuseConfig {
publicKey : string ;
secretKey : string ;
host : string ;
}
export interface AppConfig {
plaid : PlaidConfig ;
cohere : CohereConfig ;
qdrant : QdrantConfig ;
langfuse : LangfuseConfig ;
} export interface AskRequest {
query : string ;
sessionId ?: string ;
userId ?: string ;
}
export interface AskResponse {
answer : string ;
sources : { name : string ; amount ?: number ; date ?: string }[];
sessionId : string ;
decision : "ROUTE" | "CLARIFY" | "FALLBACK" ;
target ?: string ;
}
export interface IngestionResponse {
status : string ;
transactionsIngested : number ;
}
export interface ErrorResponse {
error : string ;
detail ?: string ;
} Create src/types/rag.ts — this imports the REAA package to get the DocumentSchema and shared RAG types:
import { DocumentSchema, ChunkingStrategy, type Document, type Chunk, type ChunkingConfig, type RetrievalResult, type HybridResult } from "@reaatech/hybrid-rag" ;
export function validateDocument (doc : unknown ) : Document {
return DocumentSchema. parse (doc);
}
export { DocumentSchema, ChunkingStrategy };
export type { Document, Chunk, ChunkingConfig, RetrievalResult, HybridResult }; Expected output: pnpm typecheck still passes. The types are self-contained and have no dependencies on each other.
Step 4: Build the config loader Create src/config/cohere-config.ts:
import { type CohereConfig } from "../types/config.js" ;
export function getCohereConfig () : CohereConfig {
const apiKey = process.env.COHERE_API_KEY;
if ( ! apiKey) {
throw new Error ( "COHERE_API_KEY required" );
}
return { apiKey };
} Create src/config/qdrant-config.ts:
import { type QdrantConfig } from "../types/config.js" ;
export function getQdrantConfig () : QdrantConfig {
return {
url: process.env.QDRANT_URL ?? "http://localhost:6333" ,
apiKey: process.env.QDRANT_API_KEY ?? undefined ,
};
} Create src/config/index.ts — the master config loader that reads every env var:
import { type AppConfig } from "../types/config.js" ;
import { getCohereConfig } from "./cohere-config.js" ;
import { getQdrantConfig } from "./qdrant-config.js" ;
export function loadConfig () : AppConfig {
const plaidClientId = process.env.PLAID_CLIENT_ID;
if ( ! plaidClientId) {
throw new Error ( "PLAID_CLIENT_ID required" );
}
const plaidSecret = process.env.PLAID_SECRET;
if ( ! plaidSecret) {
throw new Error ( "PLAID_SECRET required" );
}
const plaidAccessToken = process.env.PLAID_ACCESS_TOKEN;
if ( ! plaidAccessToken) {
throw new Error ( "PLAID_ACCESS_TOKEN required" );
}
const langfusePublicKey = process.env.LANGFUSE_PUBLIC_KEY;
if ( ! langfusePublicKey) {
throw new Error ( "LANGFUSE_PUBLIC_KEY required" );
}
const langfuseSecretKey = process.env.LANGFUSE_SECRET_KEY;
if ( ! langfuseSecretKey) {
throw new Error ( "LANGFUSE_SECRET_KEY required" );
}
const langfuseHost = process.env.LANGFUSE_HOST;
if ( ! langfuseHost) {
throw new Error ( "LANGFUSE_HOST required" );
}
return {
plaid: {
clientId: plaidClientId,
secret: plaidSecret,
env: process.env.PLAID_ENVIRONMENT ?? "sandbox" ,
accessToken: plaidAccessToken,
},
cohere: getCohereConfig (),
qdrant: getQdrantConfig (),
langfuse: {
publicKey: langfusePublicKey,
secretKey: langfuseSecretKey,
host: langfuseHost,
},
};
} Expected output: pnpm typecheck passes. loadConfig() will throw descriptive errors if any required env var is missing.
Step 5: Connect to Plaid The Plaid integration has three layers: client setup, transactions fetching, and transformers.
Create src/services/plaid/client.ts:
import { Configuration, PlaidApi, PlaidEnvironments } from "plaid" ;
import { type PlaidConfig } from "../../types/config.js" ;
export function createPlaidClient (config : PlaidConfig ) : PlaidApi {
const configuration = new Configuration ({
basePath: PlaidEnvironments.sandbox,
baseOptions: {
headers: {
"PLAID-CLIENT-ID" : config.clientId,
"PLAID-SECRET" : config.secret,
"Plaid-Version" : "2020-09-14" ,
},
},
});
return new PlaidApi (configuration);
} Create src/services/plaid/transactions.ts — wraps transactionsSync with typed mapping and error extraction:
import { type PlaidApi } from "plaid" ;
import { type PlaidTransaction, type PlaidErrorDetail } from "../../types/plaid.js" ;
export interface TransactionsSyncResult {
added : PlaidTransaction [];
modified : PlaidTransaction [];
removed : PlaidTransaction [];
nextCursor : string | null ;
}
function mapTransaction (tx : {
transaction_id : string ;
account_id : string ;
amount : number ;
date : string ;
name : string ;
category ?: string [] | null ;
pending : boolean ;
merchant_name ?: string | null ;
}) : PlaidTransaction {
return {
id: tx.transaction_id,
accountId: tx.account_id,
amount: tx.amount,
date: tx.date,
name: tx.name,
category: tx.category ?? [],
pending: tx.pending,
merchantName: tx.merchant_name ?? null ,
};
}
export async function fetchTransactions (
client : PlaidApi ,
accessToken : string ,
cursor ?: string ,
) : Promise < TransactionsSyncResult > {
try {
const response = await client. transactionsSync ({
access_token: accessToken,
cursor,
});
const data = response.data;
return {
added: data.added. map (mapTransaction),
modified: data.modified. map (mapTransaction),
removed: data.removed. map ((r : { transaction_id : string }) => ({
id: r.transaction_id,
accountId: "" ,
amount: 0 ,
date: "" ,
name: "" ,
category: [],
pending: false ,
merchantName: null ,
})),
nextCursor: data.next_cursor,
};
} catch (error : unknown ) {
const plaidError = error as { response ?: { data ?: PlaidErrorDetail } };
if (plaidError.response?.data) {
const detail = plaidError.response.data;
const typedError = new Error ( `Plaid error: ${ detail . errorType } - ${ detail . errorCode } - ${ detail . errorMessage }` );
Object. assign (typedError, { error_type: detail.errorType, error_code: detail.errorCode });
throw typedError;
}
throw error;
}
} Create src/services/plaid/accounts.ts:
import { type PlaidApi } from "plaid" ;
import { type PlaidAccount } from "../../types/plaid.js" ;
function mapAccount (acct : {
account_id : string ;
name : string ;
type : string ;
subtype ?: string | null ;
balances : { current ?: number | null ; available ?: number | null };
}) : PlaidAccount {
return {
id: acct.account_id,
name: acct.name,
type: acct.type,
subtype: acct.subtype ?? null ,
balances: {
current: acct.balances.current ?? null ,
available: acct.balances.available ?? null ,
},
};
}
export async function fetchAccounts (
client : PlaidApi ,
accessToken : string ,
) : Promise < PlaidAccount []> {
const response = await client. accountsGet ({
access_token: accessToken,
});
return response.data.accounts. map (mapAccount);
} Create src/services/plaid/transformer.ts — produces the chunked text that will be embedded:
import { type TransactionChunk } from "../../types/finance.js" ;
import { type PlaidTransaction } from "../../types/plaid.js" ;
export function transformTransaction (tx : PlaidTransaction ) : TransactionChunk {
return {
id: tx.id,
docId: tx.accountId,
content: chunkTransaction (tx),
source: "plaid" ,
};
}
export function chunkTransaction (tx : PlaidTransaction ) : string {
const categoryStr = tx.category.length > 0 ? tx.category. join ( ", " ) : "uncategorized" ;
const merchant = tx.merchantName ?? tx.name;
return `[${ tx . date }] ${ tx . name } $${ tx . amount . toFixed ( 2 ) } ${ categoryStr } — ${ merchant }` ;
} Expected output: TypeScript compiles. The Plaid client can be instantiated with config, and fetchTransactions returns typed transactions with cursor support.
Step 6: Build the embedding and vector store layer Create src/services/embeddings/embedder.ts:
import { EmbeddingModel, FlagEmbedding } from "fastembed" ;
type EmbedderModel = FlagEmbedding ;
export async function initEmbedder () : Promise < EmbedderModel > {
return await FlagEmbedding. init ({ model: EmbeddingModel.BGEBaseEN });
}
export async function* embedDocuments (
model : EmbedderModel ,
texts : string [],
batchSize = 256 ,
) : AsyncGenerator < number [][]> {
const embeddingIterable = model. embed (texts, batchSize);
for await ( const batch of embeddingIterable) {
yield batch;
}
}
export async function embedQuery (
model : EmbedderModel ,
text : string ,
) : Promise < number []> {
return await model. queryEmbed (text);
} Create src/services/vector-store/qdrant-client.ts:
import { QdrantClient } from "@qdrant/js-client-rest" ;
import { type QdrantConfig } from "../../types/config.js" ;
export function createQdrantClient (config : QdrantConfig ) : QdrantClient {
return new QdrantClient ({
url: config.url,
apiKey: config.apiKey,
});
}
export async function ensureCollection (
client : QdrantClient ,
name : string ,
vectorSize : number ,
) : Promise < void > {
const collections = await client. getCollections ();
const exists = collections.collections. some ((c) => c.name === name);
if ( ! exists) {
await client. createCollection (name, {
vectors: { size: vectorSize, distance: "Cosine" },
});
}
}
export async function upsertPoints (
client : QdrantClient ,
collection : string ,
points : { id : string ; vector : number []; payload : Record < string , unknown > }[],
) : Promise < void > {
await client. upsert (collection, { points, wait: true });
}
export async function searchPoints (
client : QdrantClient ,
collection : string ,
vector : number [],
topK = 5 ,
) : Promise <
{ id : string ; score : number ; payload : Record < string , unknown > }[]
> {
const results = await client. search (collection, {
vector,
limit: topK,
with_payload: true ,
});
return results. map ((r) => ({
id: String (r.id),
score: r.score,
payload: r.payload ?? {},
}));
} Expected output: TypeScript compiles. The Qdrant client connects to http://localhost:6333 (or whatever URL you configured).
Step 7: Connect to Cohere Create src/services/cohere/client.ts:
import { CohereClientV2 } from "cohere-ai" ;
export function createCohereClient () : CohereClientV2 {
return new CohereClientV2 ({});
} Create src/services/cohere/chat.ts:
import { CohereClientV2 } from "cohere-ai" ;
import type { TransactionChunk } from "../../types/finance.js" ;
export interface ChatResult {
answer : string ;
usage : { inputTokens : number ; outputTokens : number };
}
export async function answerQuestion (
client : CohereClientV2 ,
query : string ,
context : string
) : Promise < ChatResult > {
const response = await client. chat ({
model: "command-a-03-2025" ,
messages: [
{ role: "system" , content: context },
{ role: "user" , content: query },
],
});
const messageContent = response.message.content;
let answer = "" ;
if (messageContent && messageContent.length > 0 ) {
const first = messageContent[ 0 ];
if (first.type === "text" ) {
answer = first.text;
}
}
const usage = {
inputTokens: response.usage?.billedUnits?.inputTokens ?? 0 ,
outputTokens: response.usage?.billedUnits?.outputTokens ?? 0 ,
};
return { answer, usage };
}
export function formatContext (chunks : TransactionChunk []) : string {
const content = chunks. map ((chunk) => chunk.content). join ( "\n" );
return chunks.length > 0 ? `Relevant transaction history:\n${ content }` : "No transaction history found." ;
} Create src/services/cohere/stream.ts — for streaming responses:
import { CohereClientV2 } from "cohere-ai" ;
export async function* answerQuestionStream (
client : CohereClientV2 ,
query : string ,
context : string
) : AsyncGenerator < string > {
const stream = await client. chatStream ({
model: "command-a-03-2025" ,
messages: [
{ role: "system" , content: `Relevant transaction history:\n${ context }` },
{ role: "user" , content: query },
],
});
for await ( const event of stream) {
if (event.type === "content-delta" ) {
yield event.delta?.message?.content?.text ?? "" ;
}
}
} Create src/services/cohere/errors.ts:
import { CohereError, CohereTimeoutError } from "cohere-ai" ;
import { type CohereErrorDetail } from "../../types/cohere.js" ;
export function handleCohereError (error : unknown ) : CohereErrorDetail {
if (error instanceof CohereTimeoutError ) {
return {
statusCode: 408 ,
message: error.message,
body: error,
};
}
if (error instanceof CohereError ) {
return {
statusCode: error.statusCode ?? 500 ,
message: error.message,
body: error.body ?? error,
};
}
return {
statusCode: 500 ,
message: error instanceof Error ? error.message : "Unknown error" ,
body: error,
};
} Expected output: TypeScript compiles. CohereClientV2 reads the COHERE_API_KEY from the environment automatically.
Step 8: Build the confidence router The confidence router decides whether a user query is clear enough to answer directly, ambiguous (needs clarification), or too vague to handle.
Create src/services/router/decision-router.ts:
import { ConfidenceRouter, KeywordClassifier } from "@reaatech/confidence-router" ;
export interface RouterDecision {
type : "ROUTE" | "CLARIFY" | "FALLBACK" ;
target ?: string ;
prompt ?: string ;
}
let routerInstance : ConfidenceRouter | undefined ;
function getRouter () : ConfidenceRouter {
if ( ! routerInstance) {
routerInstance = new ConfidenceRouter ({
routeThreshold: 0.8 ,
fallbackThreshold: 0.3 ,
clarificationEnabled: true ,
});
routerInstance. registerClassifier (
new KeywordClassifier ([
{
label: "cash_flow" ,
keywords: [ "cash flow" , "income" , "revenue" , "money in" , "money out" ],
},
{
label: "vendor_spending" ,
keywords: [ "vendor" , "spending" , "paid to" , "supplier" , "merchant" ],
},
{
label: "recurring_costs" ,
keywords: [ "recurring" , "subscription" , "monthly" , "bill" , "repeating" ],
},
{
label: "balance" ,
keywords: [ "balance" , "how much" , "account" , "savings" ],
},
{
label: "transaction_history" ,
keywords: [ "history" , "recent" , "last" , "list" , "show" ],
},
{
label: "general" ,
keywords: [ "help" , "what can you" , "explain" ],
},
]),
);
}
return routerInstance;
}
export async function decideQuery (query : string ) : Promise < RouterDecision > {
try {
const trimmed = query. trim (). toLowerCase ();
if ( ! trimmed) {
return { type: "FALLBACK" };
}
const decision = await getRouter (). process (query);
const type = decision.type === ( "ROUTE" as string ) ? "ROUTE" :
decision.type === ( "CLARIFY" as string ) ? "CLARIFY" : "FALLBACK" as "ROUTE" | "CLARIFY" | "FALLBACK" ;
return {
type,
target: decision.target ?? undefined ,
prompt: decision.prompt ?? undefined ,
};
} catch {
return { type: "FALLBACK" };
}
} Expected output: TypeScript compiles. Query “what is my cash flow” routes to cash_flow; “money” (ambiguous) returns a clarification prompt; empty string falls back.
Step 9: Build the session manager Conversational Q&A needs multi-turn context. The session manager wraps @reaatech/session-continuity with an in-memory store.
Create src/services/session/manager.ts:
import {
SessionManager,
type IStorageAdapter,
type TokenCounter,
type Session,
type Message,
type SessionId,
type MessageId,
type HealthStatus,
} from "@reaatech/session-continuity" ;
class InMemoryStorage implements IStorageAdapter {
private sessions = new Map < string , Session >();
private messages = new Map < string , Message []>();
async createSession (
session : Omit <
Expected output: TypeScript compiles. Sessions auto-compress when they exceed the 4096-token budget using a sliding window.
Step 10: Build the output repair system Cohere may return JSON with markdown fences, trailing commas, or missing fields. The structured repair engine fixes these automatically.
Create src/services/repair/answer-repair.ts:
import { z } from "zod" ;
import { repair, repairOutput, isValid, analyzeInput, UnrepairableError } from "@reaatech/structured-repair-core" ;
export const CohereAnswerSchema = z. object ({
answer: z. string (),
sources: z. array (
z. object ({
name: z. string (),
amount: z. number (). optional (),
date: z. string (). optional (),
}),
),
confidence: z. number (),
});
export async function repairCohereOutput (raw : string ) : Promise < z . infer < typeof CohereAnswerSchema>> {
const analysis = analyzeInput (raw);
if ( ! analysis.isValidJson) {
console. warn ( "Cohere output has JSON issues" , analysis.issues);
}
try {
return await repair (CohereAnswerSchema, raw);
} catch (error : unknown ) {
if (error instanceof UnrepairableError ) {
console. warn ( "Repair failed — partial data:" , error.context?.partialData, "field errors:" , error.context?.fieldErrors);
}
throw error;
}
}
export function repairCohereOutputWithDebug (raw : string ) {
return repairOutput ({
schema: CohereAnswerSchema,
input: raw,
debug: true ,
strategies: [ "strip-fences" , "fix-json-syntax" , "coerce-types" ],
});
}
export function validateCohereOutput (data : unknown ) : boolean {
try {
return isValid (CohereAnswerSchema, JSON. stringify (data));
} catch {
return false ;
}
} Expected output: A malformed Cohere output like ```json\n{ "answer": "ok", "sources": [], "confidence": "0.9" }\n``` gets stripped to valid JSON with type coercion.
Step 11: Wire up the RAG pipeline The RAG pipeline connects everything: embed queries, search Qdrant, call Cohere with context, and repair the output.
Create src/services/rag/pipeline.ts:
import { DocumentSchema } from "@reaatech/hybrid-rag" ;
import type { TransactionChunk } from "../../types/finance.js" ;
import type { AppConfig } from "../../types/config.js" ;
import { answerQuestion, formatContext } from "../cohere/chat.js" ;
import { createCohereClient } from "../cohere/client.js" ;
import { initEmbedder, embedQuery } from "../embeddings/embedder.js" ;
import { createQdrantClient, searchPoints } from "../vector-store/qdrant-client.js" ;
import { loadConfig } from "../../config/index.js" ;
import { repairCohereOutput } from "../repair/answer-repair.js" ;
export async function retrieve (query : string , topK = 5 , config ?: AppConfig ) : Promise < TransactionChunk []> {
const resolvedConfig = config ?? loadConfig ();
const embedder = await initEmbedder ();
const qdrantClient = createQdrantClient (resolvedConfig.qdrant);
const vector = await embedQuery (embedder, query);
const results = await searchPoints (qdrantClient, "plaid-transactions" , vector, topK);
return results. map ((r) => {
const p = r.payload as { date ?: string ; name ?: string ; amount ?: number ; category ?: string []; merchantName ?: string | null };
return {
id: r.id,
docId: r.id,
content: [p.date, p.name, p.amount != null ? `$${ String ( p . amount ) }` : "" , (p.category ?? []). join ( ", " ), p.merchantName]. filter (Boolean). join ( " — " ),
source: "plaid" ,
};
});
}
export interface RAGPipeline {
answer (query : string , context ?: string ) : Promise <{ answer : string ; sources : Array <{ name : string ; amount ?: number ; date ?: string }> }>;
retrieve (query : string , topK ?: number ) : Promise < TransactionChunk []>;
}
let pipelineInstance : RAGPipeline | undefined ;
export function createRagPipeline (config ?: AppConfig ) : RAGPipeline {
if ( ! pipelineInstance) {
pipelineInstance = buildPipeline (config);
}
return pipelineInstance;
}
function buildPipeline (config ?: AppConfig ) : RAGPipeline {
DocumentSchema. parse ({ id: "pipeline-init" , content: "RAG pipeline initialized" , source: "https://plaid.com/transactions" });
const cohereClient = createCohereClient ();
return {
retrieve (query, topK = 5 ) {
return retrieve (query, topK, config);
},
async answer (query, context) {
const chunks = await retrieve (query, 5 , config);
const formattedContext = formatContext (chunks);
const fullContext = context ? `${ context }\n\n${ formattedContext }` : formattedContext;
const result = await answerQuestion (cohereClient, query, fullContext);
let repairedAnswer : string ;
let repairedSources : Array <{ name : string ; amount ?: number ; date ?: string }> = [];
try {
const repaired = await repairCohereOutput (result.answer);
repairedAnswer = repaired.answer;
repairedSources = repaired.sources;
} catch {
repairedAnswer = result.answer;
repairedSources = chunks. map ((c) => ({ name: c.content. split ( " — " )[ 1 ] || c.content }));
}
return { answer: repairedAnswer, sources: repairedSources };
},
};
} Expected output: TypeScript compiles. The pipeline is a singleton — createRagPipeline() returns the same instance on repeated calls.
Step 12: Build the transaction ingestion pipeline The ingestion service fetches transactions from Plaid, chunks and embeds them, validates with DocumentSchema, and upserts to Qdrant.
Create src/ingestion/plaid-ingest.ts:
import type { FlagEmbedding } from "fastembed" ;
import type { QdrantClient } from "@qdrant/js-client-rest" ;
import type { PlaidApi } from "plaid" ;
import { fetchTransactions } from "../services/plaid/transactions.js" ;
import { chunkTransaction } from "../services/plaid/transformer.js" ;
import { validateDocument } from "../types/rag.js" ;
import { embedDocuments } from "../services/embeddings/embedder.js" ;
export async function ingestPlaidTransactions (
client : PlaidApi ,
embedder : FlagEmbedding ,
qdrantClient : QdrantClient ,
collectionName : string ,
accessToken : string ,
cursor ?: string ,
) : Promise <{ transactionsIngested : number ; nextCursor : string | null }> {
try {
const result = await fetchTransactions (client, accessToken, cursor);
const added = result.added;
const points : {
id : string ;
vector : number [];
payload : {
date : string ;
name : string ;
amount : number ;
category : string [];
merchantName : string | null ;
};
}[] = [];
for ( const tx of added) {
const chunkText = chunkTransaction (tx);
validateDocument ({
id: tx.id,
content: chunkText,
source: "plaid" ,
});
let vector : number [] = [];
const iter = embedDocuments (embedder, [chunkText]);
for await ( const batch of iter) {
vector = batch[ 0 ] ?? [];
break ;
}
points. push ({
id: tx.id,
vector,
payload: {
date: tx.date,
name: tx.name,
amount: tx.amount,
category: tx.category,
merchantName: tx.merchantName,
},
});
}
if (points.length > 0 ) {
await qdrantClient. upsert (collectionName, {
points,
wait: true ,
});
}
return {
transactionsIngested: added.length,
nextCursor: result.nextCursor,
};
} catch (error : unknown ) {
const plaidError = error as {
error_type ?: string ;
error_code ?: string ;
};
if (
plaidError.error_type === "ITEM_ERROR" ||
plaidError.error_code === "INVALID_INPUT"
) {
throw new Error (
`Plaid API error: ${ plaidError . error_type ?? ""} - ${ plaidError . error_code ?? ""}` ,
);
}
throw error;
}
} Create src/ingestion/scheduler.ts — runs the ingestion on a timer:
export function scheduleIngestion (
fn : () => Promise < void >,
intervalMs : number ,
) : { stop : () => void } {
void fn ();
const id = setInterval (() => { void fn (); }, intervalMs);
return {
stop : () => {
clearInterval (id);
},
};
} Expected output: TypeScript compiles. Each transaction is validated against DocumentSchema before embedding and upsert.
Step 13: Add observability with Langfuse Create src/services/observability/langfuse.ts:
import Langfuse from "langfuse" ;
interface TraceApi {
generation : (params : Record < string , unknown >) => void ;
event : (params : Record < string , unknown >) => void ;
update : (params : Record < string , unknown >) => void ;
}
interface LangfuseClient {
trace : (params : { name : string }) => {
generation : (params : Record < string , unknown >) => void ;
event : (params : Record < string , unknown >) => void ;
update : (params : Record < string , unknown >) => void ;
};
}
function createNoopClient () : LangfuseClient {
return {
trace : () => ({
generation : () => {},
event : () => {},
update : () => {},
}),
};
}
let cachedClient : LangfuseClient | undefined ;
export function createLangfuseClient (publicKey : string , secretKey : string , host : string ) : LangfuseClient {
if (cachedClient) return cachedClient;
let client : LangfuseClient ;
try {
const instance : unknown = new Langfuse ({ publicKey, secretKey, baseUrl: host });
client = {
trace : (params : { name : string }) : TraceApi => {
const traceInstance : unknown = (instance as { trace : (p : { name : string }) => unknown }). trace (params);
return {
generation : (gp : Record < string , unknown >) : void => {
(traceInstance as { generation : (p : Record < string , unknown >) => void }). generation (gp);
},
event : (ep : Record < string , unknown >) : void => {
(traceInstance as { event : (p : Record < string , unknown >) => void }). event (ep);
},
update : (up : Record < string , unknown >) : void => {
(traceInstance as { update : (p : Record < string , unknown >) => void }). update (up);
},
};
},
};
} catch {
console. warn ( "Langfuse init failed, using noop client" );
client = createNoopClient ();
}
cachedClient = client;
return client;
}
export function createTrace (client : LangfuseClient , name : string ) : ReturnType < LangfuseClient [ "trace" ]> {
return client. trace ({ name });
}
export function logGeneration (
trace : ReturnType < LangfuseClient [ "trace" ]>,
model : string ,
input : string ,
output : string ,
) : void {
trace. generation ({ name: model, model, input, output });
}
export function logRetrieval (
trace : ReturnType < LangfuseClient [ "trace" ]>,
query : string ,
resultsCount : number ,
) : void {
trace. event ({ name: "retrieval" , input: query, output: `retrieved ${ String ( resultsCount ) } results` });
} Create src/instrumentation.ts — initializes Langfuse at Next.js startup if the env vars are set:
export async function register () : Promise < void > {
if (process.env.NEXT_RUNTIME === "nodejs" ) {
const { createLangfuseClient } = await import ( "./services/observability/langfuse.js" );
const publicKey = process.env.LANGFUSE_PUBLIC_KEY ?? "" ;
const secretKey = process.env.LANGFUSE_SECRET_KEY ?? "" ;
const host = process.env.LANGFUSE_HOST ?? "https://cloud.langfuse.com" ;
if (publicKey && secretKey) {
createLangfuseClient (publicKey, secretKey, host);
}
}
} Since this file exists, next.config.ts must have experimental.instrumentationHook: true — which you set in Step 2. The dynamic import() inside register() ensures Langfuse’s Node-only SDK doesn’t crash the Edge runtime.
Expected output: pnpm typecheck passes. Langfuse degrades gracefully to a no-op client on init failure.
Step 14: Create the report export utilities These let your users download their financial data as XLSX, CSV, or extract text from PDF bank statements.
Create src/services/reports/xlsx-export.ts:
import { utils, writeFile } from "xlsx" ;
export function generateTransactionSpreadsheet (
transactions : Record < string , unknown >[],
filePath : string ,
) : void {
const worksheet = utils. json_to_sheet (transactions);
const workbook = utils. book_new ();
utils. book_append_sheet (workbook, worksheet, "Transactions" );
writeFile (workbook, filePath);
} Create src/services/reports/csv-export.ts:
function quoteField (value : unknown ) : string {
const str = value == null ? "" : typeof value === "string" ? value : JSON. stringify (value);
if (str. includes ( "," ) || str. includes ( '"' ) || str. includes ( "\n" )) {
return `"${ str . replace ( /"/ g , '""' ) }"` ;
}
return str;
}
export function exportTransactionsToCSV (transactions : Record < string , unknown >[]) : string {
const headers = [ "date" , "name" , "amount" , "category" , "merchant" ];
const rows = transactions. map ((tx) =>
headers. map ((h) => quoteField (tx[h])). join ( "," ),
);
return [headers. join ( "," ), ... rows]. join ( "\n" );
} Create src/services/reports/pdf-import.ts:
import { PDFParse } from "pdf-parse" ;
async function pdf (buffer : Buffer ) : Promise <{ text : string }> {
const parser = new PDFParse ({ data: buffer });
return parser. getText ();
}
export async function parsePdfStatement (buffer : Buffer ) : Promise < string > {
const result = await pdf (buffer);
return result.text;
} Expected output: TypeScript compiles. exportTransactionsToCSV produces properly-quoted CSV output.
Step 15: Build the API routes Now wire everything together in the Next.js App Router.
Create app/api/health/route.ts:
import { NextRequest, NextResponse } from "next/server" ;
export function GET (_req : NextRequest ) : NextResponse {
return NextResponse. json ({ status: "ok" });
} Create app/api/ask/route.ts — the Q&A endpoint:
import { NextRequest, NextResponse } from "next/server" ;
import { decideQuery } from "../../../src/services/router/decision-router.js" ;
import {
getOrCreateSession,
addTurn,
getContext,
} from "../../../src/services/session/manager.js" ;
import { createRagPipeline } from "../../../src/services/rag/pipeline.js" ;
import { createLangfuseClient, logGeneration } from "../../../src/services/observability/langfuse.js" ;
interface AskBody {
query ?: string ;
sessionId ?: string ;
userId ?: string ;
}
export async function POST (req : NextRequest ) : Promise < NextResponse > {
let query : string | undefined ;
try {
const body = ( await req. json ()) as AskBody ;
query = body.query;
const { userId } = body;
if ( ! query) {
return NextResponse. json (
{ error: "query required" },
{ status: 400 },
);
}
const decision = await decideQuery (query);
if (decision.type === "CLARIFY" ) {
return NextResponse. json ({
decision: "CLARIFY" ,
prompt: decision.prompt,
});
}
if (decision.type === "FALLBACK" ) {
return NextResponse. json ({
decision: "FALLBACK" ,
message: "Please rephrase or contact support." ,
});
}
const sid = await getOrCreateSession (userId ?? "default" );
await addTurn (sid, "user" , query);
const contextMessages = await getContext (sid);
const context = contextMessages
. map ((m) => {
const content = typeof m.content === "string" ? m.content : JSON. stringify (m.content);
return `${ m . role }: ${ content }` ;
})
. join ( "\n" );
const rag = createRagPipeline ();
const rawAnswer = await rag. answer (query, context);
await addTurn (sid, "assistant" , rawAnswer.answer);
return NextResponse. json ({
answer: rawAnswer.answer,
sources: rawAnswer.sources,
sessionId: sid,
decision: "ROUTE" ,
target: decision.target,
});
} catch (error) {
try {
const lfClient = createLangfuseClient (
process.env.LANGFUSE_PUBLIC_KEY ?? "" ,
process.env.LANGFUSE_SECRET_KEY ?? "" ,
process.env.LANGFUSE_HOST ?? "https://cloud.langfuse.com" ,
);
const trace = lfClient. trace ({ name: "ask-error" });
const errorMessage = error instanceof Error ? error.message : "Unknown error" ;
logGeneration (trace, "ask-route" , query ?? "unknown" , errorMessage);
} catch {
// ignore telemetry errors
}
return NextResponse. json (
{ error: "Internal error" },
{ status: 500 },
);
}
} Notice the three decision paths: CLARIFY returns a prompt with options, FALLBACK returns a message asking the user to rephrase, and ROUTE runs the full RAG pipeline with session context.
Create app/api/ingest/route.ts:
import { NextRequest, NextResponse } from "next/server" ;
import { createPlaidClient } from "../../../src/services/plaid/client.js" ;
import { initEmbedder } from "../../../src/services/embeddings/embedder.js" ;
import { createQdrantClient } from "../../../src/services/vector-store/qdrant-client.js" ;
import { ingestPlaidTransactions } from "../../../src/ingestion/plaid-ingest.js" ;
import { loadConfig } from "../../../src/config/index.js" ;
export async function POST (_req : NextRequest ) : Promise < NextResponse > {
try {
const config = loadConfig ();
const plaidClient = createPlaidClient (config.plaid);
const embedder = await initEmbedder ();
const qdrantClient = createQdrantClient (config.qdrant);
const result = await ingestPlaidTransactions (
plaidClient,
embedder,
qdrantClient,
"plaid-transactions" ,
config.plaid.accessToken,
);
return NextResponse. json ({
status: "ok" ,
transactionsIngested: result.transactionsIngested,
});
} catch {
return NextResponse. json (
{ error: "Ingestion failed" },
{ status: 500 },
);
}
} Create the home page app/page.tsx — a simple client-side Q&A form:
"use client" ;
import { useState } from "react" ;
interface Source {
name : string ;
amount ?: number ;
date ?: string ;
}
interface AskResponse {
answer : string ;
sources : Source [];
sessionId : string ;
decision : string ;
target ?: string ;
}
export default function Home () {
const [query, setQuery] = useState ( "" );
const [answer, setAnswer] = useState ( "" );
const [sources, setSources] = useState < Source []>([]);
const [loading, setLoading] = useState ( false );
const [error, setError] = useState ( "" );
async function handleAsk () {
if ( ! query. trim ()) return ;
setLoading ( true );
setAnswer ( "" );
setSources ([]);
setError ( "" );
try {
const res = await fetch ( "/api/ask" , {
method: "POST" ,
body: JSON. stringify ({ query }),
});
const data = ( await res. json ()) as AskResponse ;
if (res.ok) {
setAnswer (data.answer);
setSources (data.sources);
} else {
setError (data.answer || "An error occurred" );
}
} catch {
setError ( "Failed to reach the server" );
} finally {
setLoading ( false );
}
}
function handleKeyDown (e : React . KeyboardEvent < HTMLInputElement >) {
if (e.key === "Enter" ) {
handleAsk (). catch (() => {});
}
}
return (
<div style ={{ maxWidth: 640 , margin: "40px auto" , padding: "0 16px" }}>
<h1>Cohere Knowledge Agent — Plaid SMB Financial Insights</h1>
<div style ={{ display: "flex" , gap: 8 , marginBottom: 16 }}>
<input
type = "text"
value ={ query }
onChange ={( e ) => { setQuery ( e . target . value ); }}
placeholder = "Ask a financial question..."
style ={{ flex: 1 , padding: 8 }}
onKeyDown ={ handleKeyDown }
/>
<button onClick ={() => { handleAsk (). catch (() => {}); }} disabled ={ loading } style ={{ padding: "8px 16px" }}>
{ loading ? "Loading..." : "Ask" }
</button>
</div>
{ error && (
<div style ={{ color: "red" , marginBottom: 16 }}>{ error }</div>
)}
{ answer && (
<div style ={{ marginBottom: 16 }}>
<h2>Answer</h2>
<p>{ answer }</p>
</div>
)}
{ sources .length > 0 && (
<div>
<h2>Sources</h2>
<ul>
{ sources . map (( s , i ) => (
<li key ={ i }>
{ s . name }
{ s . amount !== undefined ? ` — $${ s . amount . toFixed ( 2 ) }` : "" }
{ s . date ? ` (${ s . date })` : "" }
</li>
))}
</ul>
</div>
)}
</div>
);
} Expected output: pnpm typecheck passes. You can now run pnpm dev, visit http://localhost:3000, and see the Q&A form.
Step 16: Run the tests Now create the test suite. Testing strategy: mock external services (Plaid, Cohere, Qdrant, fastembed) with vi.mock, test route handlers with NextRequest/NextResponse, and verify all three decision paths through POST /api/ask.
Create tests/app/api/ask/route.test.ts:
import { describe, it, expect, vi, beforeEach } from "vitest" ;
import { NextRequest } from "next/server" ;
import type { Message } from "@reaatech/session-continuity" ;
import { POST } from "../../../../app/api/ask/route.js" ;
vi. mock ( "../../../../src/services/router/decision-router.js" , () => ({
decideQuery: vi. fn (),
}));
vi. mock ( "../../../../src/services/session/manager.js" , () => ({
getOrCreateSession: vi. fn (),
addTurn: vi. fn (),
getContext: vi. fn (),
}));
vi. mock (
The remaining test files follow the same pattern — one per service module, testing happy, error, and boundary paths:
src/services/plaid/transactions.test.ts — mock transactionsSync, test typed results and Plaid error extraction
src/services/plaid/transformer.test.ts — test chunkTransaction with all fields and with null merchants
src/services/embeddings/embedder.test.ts — mock FlagEmbedding.init, test embedDocuments yields batches
src/services/vector-store/qdrant-client.test.ts — mock getCollections, test ensureCollection creates if missing
src/services/cohere/chat.test.ts — mock Cohere V2 chat endpoint, test answer extraction
src/services/rag/pipeline.test.ts — mock embedder + Qdrant + Cohere, test end-to-end answer flow
src/services/repair/answer-repair.test.ts — test fence-stripping, truncation repair, and UnrepairableError handling
src/services/session/manager.test.ts — test session creation, message round-trip, context compression
src/services/router/decision-router.test.ts — test routing, ambiguous queries, empty fallback
src/ingestion/plaid-ingest.test.ts — test ingestion with 5 transactions and with empty array
pnpm vitest run --coverage --reporter=json --outputFile=vitest-report.json Expected output: All tests pass (numFailedTests: 0), coverage thresholds meet 90% across lines, branches, functions, and statements.
Next steps
Deploy to production — replace the in-memory session store with @reaatech/session-continuity-storage-firestore or DynamoDB for persistent sessions across server restarts
Add streaming — wire answerQuestionStream into the ask route and send Cohere response deltas via a ReadableStream for a ChatGPT-like typing experience
Schedule ingestion — use scheduleIngestion(ingestFn, 3600000) in instrumentation.ts to re-fetch Plaid transactions every hour
Add authentication — integrate with NextAuth or Clerk so each user gets their own Plaid link and isolated Qdrant collection
Build a report download UI — add a button that calls generateTransactionSpreadsheet or exportTransactionsToCSV from the frontend
Session
,
"id"
|
"createdAt"
|
"lastActivityAt"
>
) : Promise < Session > {
const id = crypto. randomUUID ();
const now = new Date ();
const newSession : Session = {
... session,
id,
createdAt: now,
lastActivityAt: now,
};
this.sessions. set (id, newSession);
this.messages. set (id, []);
return await Promise . resolve (newSession);
}
async getSession (id : SessionId ) : Promise < Session | null > {
return await Promise . resolve (this.sessions. get (id) ?? null );
}
async updateSession (
id : SessionId ,
updates : Partial < Session >,
) : Promise < Session > {
const existing = this.sessions. get (id);
if ( ! existing) {
throw new Error ( `Session ${ id } not found` );
}
const updated : Session = {
... existing,
... updates,
lastActivityAt: new Date (),
};
this.sessions. set (id, updated);
return await Promise . resolve (updated);
}
async deleteSession (id : SessionId ) : Promise < void > {
this.sessions. delete (id);
this.messages. delete (id);
await Promise . resolve ();
}
async listSessions () : Promise < Session []> {
return await Promise . resolve (Array. from (this.sessions. values ()));
}
async addMessage (
sessionId : SessionId ,
message : Omit < Message , "id" | "sessionId" | "createdAt" >
) : Promise < Message > {
const id = crypto. randomUUID ();
const newMessage : Message = {
... message,
id: id,
sessionId,
createdAt: new Date (),
};
const existing = this.messages. get (sessionId) ?? [];
existing. push (newMessage);
this.messages. set (sessionId, existing);
return await Promise . resolve (newMessage);
}
async getMessages (
sessionId : SessionId ,
) : Promise < Message []> {
return await Promise . resolve (this.messages. get (sessionId) ?? []);
}
async updateMessage (
sessionId : SessionId ,
messageId : MessageId ,
updates : Partial < Message >
) : Promise < Message > {
const msgs = this.messages. get (sessionId);
if ( ! msgs) {
throw new Error ( `Session ${ sessionId } not found` );
}
const idx = msgs. findIndex ((m) => m.id === messageId);
if (idx === - 1 ) {
throw new Error ( `Message ${ messageId } not found in session ${ sessionId }` );
}
const updated : Message = { ... msgs[idx], ... updates };
msgs[idx] = updated;
return await Promise . resolve (updated);
}
async deleteMessage (
sessionId : SessionId ,
messageId : MessageId
) : Promise < void > {
const msgs = this.messages. get (sessionId);
if ( ! msgs) {
await Promise . resolve ();
return ;
}
this.messages. set (
sessionId,
msgs. filter ((m) => m.id !== messageId)
);
await Promise . resolve ();
}
async deleteAllMessages (sessionId : SessionId ) : Promise < void > {
this.messages. delete (sessionId);
await Promise . resolve ();
}
async getExpiredSessions (before : Date ) : Promise < SessionId []> {
return await Promise . resolve (
Array. from (this.sessions. entries ())
. filter (([, s]) => s.expiresAt !== undefined && s.expiresAt <= before)
. map (([id]) => id)
);
}
async health () : Promise < HealthStatus > {
return await Promise . resolve ({ status: "healthy" });
}
async close () : Promise < void > {
await Promise . resolve ();
}
}
class SimpleTokenCounter implements TokenCounter {
readonly model = "simple" ;
readonly tokenizer = "simple-word-count" ;
count (text : string ) : number {
return Math. ceil (text. split ( /\s + / ).length * 1.3 ) + text.length * 0.3 ;
}
countMessages (messages : Message []) : number {
let total = 0 ;
for ( const msg of messages) {
const content =
typeof msg.content === "string"
? msg.content
: JSON. stringify (msg.content);
total += this. count (content) + 4 ;
}
return total;
}
}
let managerInstance : SessionManager | undefined ;
const userSessions = new Map < string , string >();
function getManager () : SessionManager {
if ( ! managerInstance) {
managerInstance = new SessionManager ({
storage: new InMemoryStorage (),
tokenCounter: new SimpleTokenCounter (),
tokenBudget: {
maxTokens: 4096 ,
reserveTokens: 500 ,
overflowStrategy: "compress" ,
},
compression: {
strategy: "sliding_window" ,
targetTokens: 3500 ,
},
});
}
return managerInstance;
}
export function createSessionManager () : SessionManager {
return getManager ();
}
export async function getOrCreateSession (userId : string ) : Promise < string > {
const existingId = userSessions. get (userId);
if (existingId) {
try {
await getManager (). getSession (existingId);
return existingId;
} catch {
userSessions. delete (userId);
}
}
const session = await getManager (). createSession ({ userId });
userSessions. set (userId, session.id);
return session.id;
}
export async function addTurn (
sessionId : string ,
role : "user" | "assistant" ,
content : string
) : Promise < string > {
const message = await getManager (). addMessage (sessionId, { role, content });
return message.id;
}
export async function getContext (sessionId : string ) : Promise < Message []> {
return getManager (). getConversationContext (sessionId);
}
"../../../../src/services/rag/pipeline.js"
, ()
=>
({
createRagPipeline: vi. fn (),
}));
import { decideQuery } from "../../../../src/services/router/decision-router.js" ;
import {
getOrCreateSession,
addTurn,
getContext,
} from "../../../../src/services/session/manager.js" ;
import { createRagPipeline } from "../../../../src/services/rag/pipeline.js" ;
interface AskResponseData {
answer ?: string ;
sources ?: unknown [];
sessionId ?: string ;
decision ?: string ;
target ?: string ;
error ?: string ;
prompt ?: string ;
message ?: string ;
}
async function post (body : Record < string , unknown >) : Promise < Response > {
const req = new NextRequest ( new URL ( "http://localhost:3000/api/ask" ), {
method: "POST" ,
body: JSON. stringify (body),
});
return POST (req);
}
async function getJson (response : Response ) : Promise < unknown > {
return response. json ();
}
const mockedDecideQuery = vi. mocked (decideQuery);
const mockedGetOrCreateSession = vi. mocked (getOrCreateSession);
const mockedAddTurn = vi. mocked (addTurn);
const mockedGetContext = vi. mocked (getContext);
const mockedCreateRagPipeline = vi. mocked (createRagPipeline);
describe ( "POST /api/ask" , () => {
beforeEach (() => {
vi. clearAllMocks ();
});
it ( "happy: returns 200 with answer/sources/sessionId/decision" , async () => {
mockedDecideQuery. mockResolvedValueOnce ({ type: "ROUTE" , target: "cash_flow" });
mockedGetOrCreateSession. mockResolvedValueOnce ( "session-1" );
mockedAddTurn. mockResolvedValue ( "msg-1" );
const mockMessages : Message [] = [{
role: "user" ,
content: "what is my cash flow" ,
id: "1" ,
sessionId: "session-1" ,
createdAt: new Date (),
}];
mockedGetContext. mockResolvedValueOnce (mockMessages);
mockedCreateRagPipeline. mockReturnValueOnce ({
answer: vi. fn (). mockResolvedValue ({
answer: "Your cash flow is strong" ,
sources: [{ name: "Store" , amount: 50 }],
}),
retrieve: vi. fn (),
});
const response = await post ({ query: "what is my cash flow" });
const json : unknown = await getJson (response);
const data = json as AskResponseData ;
expect (response.status). toBe ( 200 );
expect (data.answer). toBe ( "Your cash flow is strong" );
expect (data.sources). toHaveLength ( 1 );
expect (data.sessionId). toBe ( "session-1" );
expect (data.decision). toBe ( "ROUTE" );
expect (data.target). toBe ( "cash_flow" );
});
it ( "error: empty body returns 400" , async () => {
const response = await post ({});
const json : unknown = await getJson (response);
const data = json as AskResponseData ;
expect (response.status). toBe ( 400 );
expect (data.error). toBe ( "query required" );
});
it ( "boundary: CLARIFY decision returns prompt not answer" , async () => {
mockedDecideQuery. mockResolvedValueOnce ({
type: "CLARIFY" ,
prompt: "Did you mean cash flow or balance?" ,
});
const response = await post ({ query: "money" });
const json : unknown = await getJson (response);
const data = json as AskResponseData ;
expect (response.status). toBe ( 200 );
expect (data.decision). toBe ( "CLARIFY" );
expect (data.prompt). toBe ( "Did you mean cash flow or balance?" );
expect (data.answer). toBeUndefined ();
});
it ( "boundary: FALLBACK decision returns message" , async () => {
mockedDecideQuery. mockResolvedValueOnce ({
type: "FALLBACK" ,
});
const response = await post ({ query: "random stuff" });
const json : unknown = await getJson (response);
const data = json as AskResponseData ;
expect (response.status). toBe ( 200 );
expect (data.decision). toBe ( "FALLBACK" );
expect (data.message). toBe ( "Please rephrase or contact support." );
});
it ( "error: internal error returns 500" , async () => {
mockedDecideQuery. mockRejectedValueOnce ( new Error ( "Unexpected error" ));
const response = await post ({ query: "test" });
expect (response.status). toBe ( 500 );
});
});