Skip to content
/ solutions / cohere-reliability-suite-for-xero-smb-accounting-ops Cohere Reliability Suite for Xero SMB Accounting Ops Runbook automation, circuit‑breaking, and session continuity for AI agents accessing Xero financial data—keeping accounting workflows resilient when APIs fail or models falter.
The problem AI accounting agents that pull invoices, bank feeds, and payments from Xero break silently when rate‑limited or when the connection drops, forcing bookkeepers to restart sessions manually and risk double‑counting.
Example artifact A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
247 kB · 143 tests· 99.7% coverage· vitest passing
SHA-256 a295b5662334b102d7a7384c9ef538e641bd1f12d2254ba5d7844bdd927f1eca Comments Sign in to commentSign in with GitHub to comment and vote.
Las Vegas, Nevada USA © 2026 REAA Technologies Inc. — Open-Source AI Solutions for Small Business.
On this page Intro
This tutorial walks you through building a reliability suite for AI agents that access Xero financial data using Cohere’s LLM. You’ll create session continuity (so retries pick up exactly where they left off), circuit breaking (to prevent cascading failures when Xero rate-limits or drops connections), and automated runbook generation (using trigger.dev and an AI analysis agent). The whole system is wired together via a middleware pipeline and exposed through Next.js API routes.
Prerequisites
Node.js 22+ and pnpm 10+ installed on your machine
A Cohere API key — get one at cohere.com
A Xero account with M2M (Machine-to-Machine) credentials — set up at developer.xero.com
An Anthropic API key for the runbook generation agent — get one at console.anthropic.com
Basic familiarity with TypeScript, Next.js App Router, and async/await patterns
Step 1: Scaffold the project and install dependencies
Create a new Next.js 16 project with the App Router and TypeScript, then install all the dependencies you’ll need.
npx create-next-app@latest cohere-reliability-suite
--typescript
--app
--src-dir
--import-alias
"@/*"
cd cohere-reliability-suite
Now add the recipe-specific dependencies. Pin every version to an exact semver.
pnpm add \
@reaatech/session-continuity@0.1.0 \
@reaatech/circuit-breaker-agents@0.1.1 \
@reaatech/agent-runbook@0.1.0 \
@reaatech/agent-runbook-agent@0.1.0 \
@trigger.dev/sdk@4.4.6 \
cohere-ai@8.0.0 \
xero-node@18.0.0 \
p-retry@8.0.0 \
zod@4.4.3 \
dotenv@17.4.2 \
langfuse@3.38.20
pnpm add -D msw@2.14.6 vitest@4.1.9 @vitest/coverage-v8@4.1.9 Open package.json and verify the dependencies block has exact versions with no ^ or ~ prefixes, the "type": "module" field is present, and the scripts include dev, build, start, typecheck, lint, and test.
Expected output: pnpm install completes without errors, and cat package.json shows all the pinned deps listed above.
Step 2: Configure environment variables Create a .env.example file with placeholder entries for every service this recipe needs. The application reads these at runtime.
COHERE_API_KEY=<your-cohere-key>
XERO_CLIENT_ID=<your-xero-client-id>
XERO_CLIENT_SECRET=<your-xero-client-secret>
XERO_SCOPES=accounting.settings,accounting.transactions,offline_access
TRIGGER_API_KEY=<your-trigger-api-key>
ANTHROPIC_API_KEY=<your-anthropic-key>
LANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>
LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>
LANGFUSE_HOST=https://cloud.langfuse.com Copy this to a real .env file and fill in your API keys:
Next, configure next.config.ts to enable the instrumentation hook. This is required because the recipe uses src/instrumentation.ts to bootstrap Langfuse telemetry:
const nextConfig = {
experimental: {
instrumentationHook: true ,
},
};
export default nextConfig; Expected output: Running cat .env.example shows the nine environment variables listed above with their placeholder values. grep -q 'instrumentationHook.*true' next.config.ts && echo "PASS" prints PASS.
Step 3: Create the custom error classes Each layer of the reliability suite uses typed error classes so callers can catch and handle failures with precision. Create src/lib/errors.ts:
export class LLMError extends Error {
public readonly code : string ;
public readonly statusCode : number | undefined ;
public readonly body : unknown ;
constructor (message : string , options ?: { code ?: string ; statusCode ?: number ; body ?: unknown }) {
super(message);
this.name = "LLMError" ;
this.code = options?.code ?? "LLM_ERROR" ;
this.statusCode = options?.statusCode;
this.body = options?.body;
}
}
export class ServiceUnavailableError extends Error {
public readonly retryAfterMs : number | undefined ;
public readonly circuitState : string ;
constructor (message : string , options ?: { retryAfterMs ?: number ; circuitState ?: string }) {
super(message);
this.name = "ServiceUnavailableError" ;
this.retryAfterMs = options?.retryAfterMs;
this.circuitState = options?.circuitState ?? "open" ;
}
}
export class XeroConnectionError extends Error {
public readonly details : string | undefined ;
constructor (message : string , options ?: { details ?: string }) {
super(message);
this.name = "XeroConnectionError" ;
this.details = options?.details;
}
}
export class AgentPipelineError extends Error {
public readonly stage : string ;
public readonly cause : Error | undefined ;
constructor (message : string , options : { stage : string ; cause ?: Error }) {
super(message);
this.name = "AgentPipelineError" ;
this.stage = options.stage;
this.cause = options.cause;
}
} Expected output: npx tsc --noEmit exits 0 after this file is created.
Step 4: Build the Cohere LLM client The LLMClient wraps CohereClientV2 with typed config, response extraction, and error wrapping. It supports both immediate chat() and streaming chatStream() calls.
import {
CohereClientV2,
CohereError,
CohereTimeoutError,
Cohere,
} from "cohere-ai" ;
import { LLMError } from "./errors.js" ;
export interface LLMClientConfig {
model ?: string ;
maxTokens ?: number ;
temperature ?: number ;
}
function extractResponseText (response : Cohere . V2ChatResponse ) : string {
const contentBlocks = response.message.content;
if ( ! contentBlocks) {
return
Expected output: npx tsc --noEmit still exits 0. The CohereClientV2 reads COHERE_API_KEY from the environment automatically.
Step 5: Implement the session continuity wrapper The @reaatech/session-continuity package provides SessionManager with token budget enforcement and context compression. Wrap it in a SessionService and provide an InMemoryStorageAdapter.
Create src/lib/session.ts:
import {
SessionManager,
type Session,
type Message,
type IStorageAdapter,
type TokenCounter,
type HealthStatus,
type UpdateSessionOptions,
type SessionFilters,
type MessageQueryOptions,
TokenBudgetExceededError,
SessionNotFoundError,
} from "@reaatech/session-continuity" ;
export class InMemoryStorageAdapter implements IStorageAdapter {
private sessions = new Map < string , Session >();
private messages = new Map < string , Message []>();
Expected output: TypeScript compiles without errors after this file is created.
Step 6: Implement the circuit breaker wrapper The @reaatech/circuit-breaker-agents package provides a CircuitBreaker class. Wrap it in a CircuitBreakerService that manages state tracking and translates CircuitOpenError into your own ServiceUnavailableError.
Create src/lib/circuit-breaker.ts:
import {
CircuitBreaker,
CircuitOpenError,
InMemoryAdapter,
DefaultMetricsCollector,
} from "@reaatech/circuit-breaker-agents" ;
import { ServiceUnavailableError } from "./errors.js" ;
export interface CircuitBreakerConfig {
name : string ;
failureThreshold ?: number ;
recoveryTimeoutMs ?: number ;
}
interface CBStats {
failureCount : number ;
successCount : number ;
state : "open" | "half-open" | "closed" ;
Expected output: npx tsc --noEmit still exits 0.
Step 7: Build the Xero client with retry logic The Xero client wraps XeroClient from xero-node with p-retry for resilience. It handles 429 rate-limits (aborts so the circuit breaker can react), 401 auth failures (aborts), and empty tenant lists.
Create src/xero/client.ts:
import { XeroClient, TokenSet } from "xero-node" ;
import pRetry, { AbortError } from "p-retry" ;
import { XeroConnectionError } from "../lib/errors.js" ;
export interface XeroConfig {
clientId : string ;
clientSecret : string ;
grantType : "client_credentials" ;
scopes : string [];
httpTimeout ?: number ;
}
export class XeroService {
private readonly xero : XeroClient ;
private readonly config : XeroConfig
Expected output: TypeScript compiles cleanly.
Step 8: Create the agent middleware pipeline The pipeline composes four middlewares left-to-right: session continuity, circuit breaker, Xero query, and Cohere LLM. Each middleware receives the current context and a next() function to advance the chain. The AgentOrchestrator class ties everything together in a single processQuery() call.
Create src/agent/middleware.ts:
import { LLMClient } from "../lib/llm.js" ;
import { SessionService } from "../lib/session.js" ;
import { CircuitBreakerService } from "../lib/circuit-breaker.js" ;
import { XeroService } from "../xero/client.js" ;
import { AgentPipelineError, ServiceUnavailableError } from "../lib/errors.js" ;
export interface AgentContext {
sessionId : string ;
userId : string ;
messages : Array <{ role : "user" | "assistant" | "system" ; content : string }>;
response ?: string ;
xeroQuery
Expected output: TypeScript compiles cleanly. Note the middleware order: session data loads first, then the circuit wraps the rest, Xero data enriches the context, and the LLM call runs last.
Step 9: Implement the runbook generation workflow The @trigger.dev/sdk schedules automated runbook generation. The runbookGenHandler builds an AnalysisContext from Xero stats, circuit breaker metrics, and session data, then uses the AnalysisAgent from @reaatech/agent-runbook-agent to generate sections.
Create src/workflows/runbook-gen.ts:
import { task } from "@trigger.dev/sdk/v3" ;
import { createAnalysisAgent, type AnalysisAgent, type AgentConfig } from "@reaatech/agent-runbook-agent" ;
import {
type AnalysisContext,
type Runbook,
generateId,
ConfigurationError,
} from "@reaatech/agent-runbook" ;
export interface RunbookGenPayload {
orgId : string ;
analysisScope ?: string [];
xeroStats ?: { invoiceCount ?: number ; tenantCount ?: number };
circuitBreakerStats ?: { failureCount ?: number ; successCount ?:
Expected output: TypeScript compiles without errors. Note that ANTHROPIC_API_KEY is validated before any LLM call — if it’s missing, a ConfigurationError is thrown.
Step 10: Create the health endpoint The health endpoint exposes circuit breaker state and Xero connection status. It returns 200 when the circuit is closed and 503 when the circuit is open.
Create app/api/health/route.ts:
import { type NextRequest, NextResponse } from "next/server" ;
import { services } from "../../../src/lib/singleton.js" ;
export async function GET (_req : NextRequest ) : Promise < NextResponse > {
void _req;
const cbService = services. getCbService ();
const xeroService = services. getXeroService ();
const circuitState = cbService. getState ();
const xeroHealthy = await xeroService. checkHealth ();
const statusCode = circuitState === "open" ? 503 : 200 ;
return NextResponse. json (
{
service: "cohere-reliability-suite" ,
circuitBreaker: { state: circuitState },
xeroHealth: xeroHealthy ? "connected" : "disconnected" ,
timestamp: new Date (). toISOString (),
},
{ status: statusCode },
);
} Expected output: Response body includes service, circuitBreaker.state, xeroHealth, and timestamp fields.
Step 11: Create the runbook trigger endpoint The runbook endpoint accepts a POST with { orgId: string; scope?: string[] }, validates the body with Zod, and triggers runbook generation via triggerRunbookGeneration.
Create app/api/runbook/route.ts:
import { type NextRequest, NextResponse } from "next/server" ;
import { z } from "zod" ;
import { triggerRunbookGeneration } from "../../../src/workflows/runbook-gen.js" ;
const RunbookTriggerSchema = z. object ({
orgId: z. string (). min ( 1 ),
scope: z. array (z. string ()). optional (),
});
export async function POST (req : NextRequest ) : Promise < NextResponse > {
try {
const body = ( await req. json ()) as unknown ;
const parsed = RunbookTriggerSchema. parse (body);
const runId = await triggerRunbookGeneration (parsed.orgId, parsed.scope);
return NextResponse. json ({ accepted: true , runbookGenId: runId }, { status: 202 });
} catch (err : unknown ) {
if (err instanceof z . ZodError ) {
return NextResponse. json (
{
error: "ValidationError" ,
details: err.issues. map ((e : { path : PropertyKey []; message : string }) => ({
path: e.path. join ( "." ),
message: e.message,
})),
},
{ status: 400 },
);
}
throw err;
}
} Expected output: A valid POST to /api/runbook with { "orgId": "org_123" } returns 202 with { "accepted": true, "runbookGenId": "rb_..." }. An empty body returns 400 with a Zod validation error.
Step 12: Wire up Langfuse telemetry Langfuse provides observability for LLM calls. The register() function in src/instrumentation.ts boots the telemetry client when the Next.js Node.js runtime starts. The traceLLMCall() export logs individual LLM invocations.
Create src/instrumentation.ts:
import { Langfuse } from "langfuse" ;
let langfuseClient : Langfuse | null = null ;
export function getLangfuseClient () : Langfuse {
if ( ! langfuseClient) {
langfuseClient = new Langfuse ({
publicKey: process.env.LANGFUSE_PUBLIC_KEY ?? "" ,
secretKey: process.env.LANGFUSE_SECRET_KEY ?? "" ,
baseUrl: process.env.LANGFUSE_HOST ?? "https://cloud.langfuse.com" ,
});
}
return langfuseClient;
}
export interface TraceParams {
provider : string ;
model : string ;
input : unknown ;
output : unknown ;
durationMs : number ;
}
export function traceLLMCall (params : TraceParams ) : void {
try {
const client = getLangfuseClient ();
client. trace ({
name: `cohere.${ params . provider }.${ params . model }` ,
input: params.input,
output: params.output,
});
} catch {
// Tracing is best-effort; never crash the app
}
}
export async function register () : Promise < void > {
if (process.env.NEXT_RUNTIME === "nodejs" ) {
try {
const { Langfuse } = await import ( "langfuse" );
langfuseClient = new Langfuse ({
publicKey: process.env.LANGFUSE_PUBLIC_KEY ?? "" ,
secretKey: process.env.LANGFUSE_SECRET_KEY ?? "" ,
baseUrl: process.env.LANGFUSE_HOST ?? "https://cloud.langfuse.com" ,
});
} catch {
// Silently skip telemetry if langfuse isn't configured
}
}
} Notice that register() uses dynamic import() — this is required because register() runs in both Node.js and Edge runtimes, and the langfuse module depends on Node-only APIs.
Also verify that next.config.ts has experimental.instrumentationHook: true set. Without this flag, register() is dead code.
grep -q 'instrumentationHook.*true' next.config.ts && echo "PASS" || echo "FAIL" Expected output: The grep returns “PASS”.
Step 13: Wire up the service container The ServiceContainer lazily initializes all four services (LLMClient, SessionService, CircuitBreakerService, XeroService) as singletons. Route handlers and middleware import the exported services instance.
Create src/lib/singleton.ts:
import { ConfigurationError } from "@reaatech/agent-runbook" ;
import { LLMClient } from "./llm.js" ;
import { SessionService, createDefaultSessionManager } from "./session.js" ;
import { CircuitBreakerService } from "./circuit-breaker.js" ;
import { XeroService } from "../xero/client.js" ;
export class ServiceContainer {
private _llmClient : LLMClient | null = null ;
private _sessionService : SessionService | null = null ;
private _cbService : CircuitBreakerService | null = null ;
private _xeroService : XeroService | null = null ;
getLLMClient () : LLMClient {
if ( ! this._llmClient) {
if ( ! process.env.COHERE_API_KEY) {
throw new ConfigurationError ( "COHERE_API_KEY is required" );
}
this._llmClient = new LLMClient ();
}
return this._llmClient;
}
getSessionService () : SessionService {
if ( ! this._sessionService) {
const manager = createDefaultSessionManager ();
this._sessionService = new SessionService (manager);
}
return this._sessionService;
}
getCbService () : CircuitBreakerService {
if ( ! this._cbService) {
this._cbService = new CircuitBreakerService ({
name: "xero-api" ,
failureThreshold: 5 ,
recoveryTimeoutMs: 30000 ,
});
}
return this._cbService;
}
getXeroService () : XeroService {
if ( ! this._xeroService) {
const clientId = process.env.XERO_CLIENT_ID ?? "" ;
const clientSecret = process.env.XERO_CLIENT_SECRET ?? "" ;
const scopes = (process.env.XERO_SCOPES ?? "" ). split ( "," ). filter (Boolean);
this._xeroService = new XeroService ({
clientId,
clientSecret,
grantType: "client_credentials" ,
scopes: scopes.length > 0 ? scopes : [ "accounting.settings" , "accounting.transactions" , "offline_access" ],
});
}
return this._xeroService;
}
async initializeXero () : Promise < void > {
const xero = this. getXeroService ();
await xero. initialize ();
}
}
export const services = new ServiceContainer (); Expected output: TypeScript compiles cleanly. All four services are lazy — no work happens until the first call to a get*() method.
Step 14: Run the tests The recipe ships with a comprehensive test suite covering every module. Run all tests with coverage reporting:
pnpm vitest run --coverage --reporter=json --outputFile=vitest-report.json
Error classes — construction and property storage for all four custom error types
LLM client — happy path chat, 429/500 error wrapping, streaming deltas, empty messages guard, health check
Session service — create/get/addMessage lifecycle, compression on token budget overflow, auto-create on missing session
Circuit breaker — state transitions between closed/open/half-open, stats tracking, health reporting
Xero client — initialization, retry on network timeout, abort on 429/401, empty tenant guard
Agent middleware — full pipeline execution, circuit-open fallback, session auto-creation, Xero data injection
Runbook generation — trigger.dev task handler assembly, ANTHROPIC_API_KEY guard, section generation
API routes — GET /api/health with closed/open circuit, POST /api/runbook with validation
Expected output: All 143 tests pass with 0 failures across 32 test suites.
After the tests pass, run the full quality gate:
pnpm typecheck && pnpm lint && pnpm vitest run --coverage --reporter=json --outputFile=vitest-report.json Expected output: TypeScript type check passes (exit 0), ESLint passes, and vitest reports numFailedTests: 0 with coverage thresholds of at least 90% on lines, branches, functions, and statements.
Next steps
Add a database-backed storage adapter — swap InMemoryStorageAdapter for a real PostgreSQL or Firestore adapter from @reaatech/session-continuity so sessions survive process restarts
Wire traceLLMCall into the pipeline — call the Langfuse tracing function from cohereLlmMiddleware to log every LLM invocation with input/output/duration
Add a dashboard page — create an app/dashboard/page.tsx that renders circuit breaker stats and Xero health status from the /api/health endpoint in real-time
Deploy the runbook workflow — configure trigger.dev with your API key so runbookGenTask runs on a schedule instead of being triggered manually via the API endpoint
Add more Xero data sources — extend xeroQueryMiddleware to handle contacts, accounts, and bank statements by mapping additional Xero API methods
""
;
}
const textParts : string [] = [];
for ( const block of contentBlocks) {
if (block.type === "text" ) {
textParts. push (block.text);
}
}
return textParts. join ( "" );
}
export class LLMClient {
private readonly cohere : CohereClientV2 ;
private readonly config : Required < LLMClientConfig >;
constructor (config : LLMClientConfig = {}) {
this.cohere = new CohereClientV2 ({});
this.config = {
model: config.model ?? "command-a-03-2025" ,
maxTokens: config.maxTokens ?? 1024 ,
temperature: config.temperature ?? 0.3 ,
};
}
private toChatMessages (
messages : Array <{ role : "user" | "assistant" | "system" ; content : string }>,
) : Cohere . ChatMessages {
const result : Cohere . ChatMessageV2 [] = [];
for ( const msg of messages) {
if (msg.role === "user" ) {
const m : Cohere . ChatMessageV2 = { role: "user" , content: msg.content };
result. push (m);
} else if (msg.role === "assistant" ) {
const m : Cohere . ChatMessageV2 = { role: "assistant" , content: msg.content };
result. push (m);
} else {
const m : Cohere . ChatMessageV2 = { role: "system" , content: msg.content };
result. push (m);
}
}
return result;
}
async chat (
messages : Array <{ role : "user" | "assistant" | "system" ; content : string }>,
) : Promise < string > {
if (messages.length === 0 ) {
throw new LLMError ( "Messages array must not be empty" , { code: "EMPTY_MESSAGES" });
}
try {
const response = await this.cohere. chat ({
model: this.config.model,
messages: this. toChatMessages (messages),
});
return extractResponseText (response);
} catch (err) {
if (err instanceof CohereError ) {
const opts : { code ?: string ; statusCode ?: number ; body ?: unknown } = {
code: "API_ERROR" ,
};
if (err.statusCode != null ) opts.statusCode = err.statusCode;
if (err.body != null ) opts.body = err.body;
throw new LLMError (err.message, opts);
}
if (err instanceof CohereTimeoutError ) {
throw new LLMError (err.message, { code: "TIMEOUT" });
}
throw err;
}
}
async chatStream (
messages : Array <{ role : "user" | "assistant" | "system" ; content : string }>,
onDelta : (text : string ) => void ,
) : Promise < void > {
try {
const stream = await this.cohere. chatStream ({
model: this.config.model,
messages: this. toChatMessages (messages),
});
for await ( const event of stream) {
if (event.type === "content-delta" ) {
const delta = event.delta?.message;
if (delta) {
onDelta ( typeof delta === "string" ? delta : JSON. stringify (delta));
}
}
}
} catch (err) {
if (err instanceof CohereError ) {
const opts : { code ?: string ; statusCode ?: number ; body ?: unknown } = {
code: "API_ERROR" ,
};
if (err.statusCode != null ) opts.statusCode = err.statusCode;
if (err.body != null ) opts.body = err.body;
throw new LLMError (err.message, opts);
}
if (err instanceof CohereTimeoutError ) {
throw new LLMError (err.message, { code: "TIMEOUT" });
}
throw err;
}
}
checkHealth () : Promise < boolean > {
return Promise . resolve (
typeof process.env.COHERE_API_KEY === "string" && process.env.COHERE_API_KEY.length > 0 ,
);
}
}
createSession (session : Omit < Session , "id" | "createdAt" | "lastActivityAt" >) : Promise < Session > {
const id = crypto. randomUUID ();
const now = new Date ();
const s = {
... session,
id,
createdAt: now,
lastActivityAt: now,
status: "active" ,
metadata: {},
participants: [],
schemaVersion: 1 ,
} as Session ;
this.sessions. set (id, s);
return Promise . resolve (s);
}
getSession (id : string ) : Promise < Session | null > {
return Promise . resolve (this.sessions. get (id) ?? null );
}
updateSession (
id : string ,
updates : Partial < Session >,
options ?: UpdateSessionOptions ,
) : Promise < Session > {
const s = this.sessions. get (id);
if ( ! s) return Promise . reject ( new Error ( "Session not found" ));
if (options?.expectedVersion != null && s.version !== options.expectedVersion) {
return Promise . reject ( new Error ( "ConcurrencyError: version mismatch" ));
}
const updated = { ... s, ... updates, lastActivityAt: new Date () } as Session ;
this.sessions. set (id, updated);
return Promise . resolve (updated);
}
deleteSession (id : string ) : Promise < void > {
this.sessions. delete (id);
this.messages. delete (id);
return Promise . resolve ();
}
listSessions (filters ?: SessionFilters ) : Promise < Session []> {
let result = [ ... this.sessions. values ()];
if (filters?.userId) {
result = result. filter ((s) => s.userId === filters.userId);
}
return Promise . resolve (result);
}
addMessage (
sessionId : string ,
message : Omit < Message , "id" | "sessionId" | "createdAt" >,
) : Promise < Message > {
if ( ! this.messages. has (sessionId)) {
this.messages. set (sessionId, []);
}
const msgs = this.messages. get (sessionId) as Message [];
const m = {
... message,
id: crypto. randomUUID (),
sessionId,
createdAt: new Date (),
sequence: msgs.length + 1 ,
} as Message ;
msgs. push (m);
return Promise . resolve (m);
}
getMessages (sessionId : string , options ?: MessageQueryOptions ) : Promise < Message []> {
void options;
return Promise . resolve (this.messages. get (sessionId) ?? []);
}
updateMessage (
sessionId : string ,
messageId : string ,
updates : Partial < Message >,
) : Promise < Message > {
const msgs = this.messages. get (sessionId);
if ( ! msgs) return Promise . reject ( new Error ( "Session not found" ));
const idx = msgs. findIndex ((m) => m.id === messageId);
if (idx === - 1 ) return Promise . reject ( new Error ( "Message not found" ));
const updated = { ... msgs[idx], ... updates } as Message ;
msgs[idx] = updated;
return Promise . resolve (updated);
}
deleteMessage (sessionId : string , messageId : string ) : Promise < void > {
const msgs = this.messages. get (sessionId);
if (msgs) {
const idx = msgs. findIndex ((m) => m.id === messageId);
if (idx !== - 1 ) msgs. splice (idx, 1 );
}
return Promise . resolve ();
}
deleteAllMessages (sessionId : string ) : Promise < void > {
this.messages. delete (sessionId);
return Promise . resolve ();
}
getExpiredSessions (before : Date ) : Promise < string []> {
const ids = [ ... this.sessions. values ()]
. filter ((s) => s.expiresAt && s.expiresAt <= before)
. map ((s) => s.id);
return Promise . resolve (ids);
}
health () : Promise < HealthStatus > {
return Promise . resolve ({ status: "healthy" , details: { storageType: "memory" } });
}
close () : Promise < void > {
this.sessions. clear ();
this.messages. clear ();
return Promise . resolve ();
}
}
function messageContentToString (content : string | Array < unknown >) : string {
if ( typeof content === "string" ) return content;
return (content as Array < Record < string , unknown >>)
. map ((b) => (b.text as string | undefined ) ?? "" )
. join ( " " );
}
function getDefaultTokenCounter () : TokenCounter {
return {
count (text : string ) : number {
return Math. ceil (text.length / 4 );
},
countMessages (messages : Message []) : number {
const text = messages
. map ((m) => messageContentToString (m.content))
. join ( " " );
return Math. ceil (text.length / 4 );
},
model: "command-a-03-2025" ,
tokenizer: "approximate" ,
};
}
export function createDefaultSessionManager () : SessionManager {
return new SessionManager ({
storage: new InMemoryStorageAdapter (),
tokenCounter: getDefaultTokenCounter (),
tokenBudget: {
maxTokens: 4096 ,
reserveTokens: 500 ,
overflowStrategy: "compress" ,
},
compression: {
strategy: "sliding_window" ,
targetTokens: 3500 ,
},
});
}
export class SessionService {
private readonly manager : SessionManager ;
constructor (manager : SessionManager ) {
this.manager = manager;
}
async createSession (userId : string ) : Promise < Session > {
return this.manager. createSession ({ userId });
}
async getSession (id : string ) : Promise < Session > {
try {
return await this.manager. getSession (id);
} catch (err) {
if (err instanceof SessionNotFoundError ) {
throw err;
}
throw err;
}
}
async addMessage (
sessionId : string ,
message : { role : "user" | "assistant" | "system" ; content : string },
) : Promise < Message > {
try {
return await this.manager. addMessage (sessionId, message);
} catch (err) {
if (err instanceof TokenBudgetExceededError ) {
await this.manager. compressContext (sessionId);
return this.manager. addMessage (sessionId, message);
}
throw err;
}
}
async getConversationContext (sessionId : string ) : Promise < Message []> {
return this.manager. getConversationContext (sessionId);
}
async endSession (id : string ) : Promise < void > {
return this.manager. endSession (id);
}
async health () : Promise < HealthStatus > {
return this.manager. health ();
}
async close () : Promise < void > {
return this.manager. close ();
}
}
}
export class CircuitBreakerService {
private readonly breaker : CircuitBreaker ;
private readonly config : Required < Omit < CircuitBreakerConfig , "name" >> & Pick < CircuitBreakerConfig , "name" >;
private failureCount = 0 ;
private successCount = 0 ;
private circuitState : "open" | "half-open" | "closed" = "closed" ;
private lastOpenTime = 0 ;
private readonly metrics : DefaultMetricsCollector ;
constructor (config : CircuitBreakerConfig ) {
this.config = {
name: config.name,
failureThreshold: config.failureThreshold ?? 5 ,
recoveryTimeoutMs: config.recoveryTimeoutMs ?? 30000 ,
};
this.metrics = new DefaultMetricsCollector ();
this.breaker = new CircuitBreaker ({
name: config.name,
failureThreshold: this.config.failureThreshold,
recoveryTimeoutMs: this.config.recoveryTimeoutMs,
persistence: new InMemoryAdapter (),
});
}
async executeWrapped < T >( fn : () => Promise < T >) : Promise < T > {
const now = Date. now ();
if (this.circuitState === "open" && now - this.lastOpenTime < this.config.recoveryTimeoutMs) {
throw new ServiceUnavailableError ( "Circuit breaker is open" , {
retryAfterMs: this.config.recoveryTimeoutMs - (now - this.lastOpenTime),
circuitState: "open" ,
});
}
if (this.circuitState === "open" && now - this.lastOpenTime >= this.config.recoveryTimeoutMs) {
this.circuitState = "half-open" ;
}
try {
const result = await this.breaker. execute (fn);
this.metrics. recordRequest (this.config.name, "success" );
this.successCount ++ ;
this.circuitState = "closed" ;
return result;
} catch (err) {
if (err instanceof CircuitOpenError ) {
this.metrics. recordRequest (this.config.name, "open" );
this.failureCount ++ ;
this.circuitState = "open" ;
this.lastOpenTime = Date. now ();
throw new ServiceUnavailableError ( "Circuit breaker is open" , {
retryAfterMs: this.config.recoveryTimeoutMs,
circuitState: "open" ,
});
}
if (err instanceof ServiceUnavailableError ) throw err;
this.failureCount ++ ;
if (this.failureCount >= this.config.failureThreshold) {
this.circuitState = "open" ;
this.lastOpenTime = Date. now ();
}
throw err;
}
}
getStats () : CBStats {
return {
failureCount: this.failureCount,
successCount: this.successCount,
state: this.circuitState,
};
}
getState () : "open" | "half-open" | "closed" {
return this.circuitState;
}
health () : { status : string ; state : string } {
return {
status: this.circuitState === "open" ? "degraded" : "healthy" ,
state: this.circuitState,
};
}
}
;
private activeTenantId : string | null = null ;
private tokenSet : TokenSet | null = null ;
constructor (config : XeroConfig ) {
this.config = config;
this.xero = new XeroClient ({
clientId: config.clientId,
clientSecret: config.clientSecret,
grantType: config.grantType,
scopes: config.scopes,
httpTimeout: config.httpTimeout ?? 3000 ,
});
}
async initialize () : Promise < void > {
await this.xero. initialize ();
this.tokenSet = await this.xero. getClientCredentialsToken ();
await this.xero. updateTenants ();
const tenants = this.xero.tenants as Array <{ tenantId ?: string }> | undefined ;
if ( ! tenants || tenants.length === 0 ) {
throw new XeroConnectionError ( "No Xero tenants connected" );
}
const firstTenant = tenants[ 0 ];
if (firstTenant?.tenantId) {
this.activeTenantId = firstTenant.tenantId;
}
}
async getInvoices (ifModifiedSince ?: Date , where ?: string ) : Promise < unknown []> {
const tenantId = this.activeTenantId;
if ( ! tenantId) {
throw new XeroConnectionError ( "Xero not initialized - no active tenant" );
}
return pRetry (
async () => {
try {
const response = await this.xero.accountingApi. getInvoices (
tenantId,
ifModifiedSince,
where,
);
return (response.body as { invoices ?: unknown [] }).invoices ?? [];
} catch (err : unknown ) {
const httpErr = err as { response ?: { statusCode ?: number }; statusCode ?: number };
if (httpErr.response?.statusCode === 429 || httpErr.statusCode === 429 ) {
throw new AbortError ( "Xero rate limited" );
}
if (httpErr.response?.statusCode === 401 || httpErr.statusCode === 401 ) {
throw new AbortError ( "Xero auth failed" );
}
throw err;
}
},
{ retries: 3 , minTimeout: 1000 , factor: 2 },
);
}
async getBankTransactions (where ?: string ) : Promise < unknown []> {
const tenantId = this.activeTenantId;
if ( ! tenantId) {
throw new XeroConnectionError ( "Xero not initialized - no active tenant" );
}
return pRetry (
async () => {
try {
const response = await this.xero.accountingApi. getBankTransactions (
tenantId,
undefined ,
where,
);
return (response.body as { bankTransactions ?: unknown [] }).bankTransactions ?? [];
} catch (err : unknown ) {
const httpErr = err as { response ?: { statusCode ?: number }; statusCode ?: number };
if (httpErr.response?.statusCode === 429 || httpErr.statusCode === 429 ) {
throw new AbortError ( "Xero rate limited" );
}
if (httpErr.response?.statusCode === 401 || httpErr.statusCode === 401 ) {
throw new AbortError ( "Xero auth failed" );
}
throw err;
}
},
{ retries: 3 , minTimeout: 1000 , factor: 2 },
);
}
async checkHealth () : Promise < boolean > {
try {
if ( ! this.activeTenantId) return false ;
await this.xero.accountingApi. getAccounts (this.activeTenantId);
return true ;
} catch {
return false ;
}
}
async refreshConnection () : Promise < void > {
if ( ! this.tokenSet || ! this.tokenSet. expired ()) {
return ;
}
this.tokenSet = await this.xero. getClientCredentialsToken ();
await this.xero. updateTenants ();
const tenants = this.xero.tenants as Array <{ tenantId ?: string }> | undefined ;
if (tenants && tenants.length > 0 && tenants[ 0 ]?.tenantId) {
this.activeTenantId = tenants[ 0 ].tenantId;
}
}
}
:
string
|
undefined
;
xeroData ?: unknown [];
circuitState ?: string ;
retryAfterMs : number | undefined ;
}
export type MiddlewareFn = (
ctx : AgentContext ,
next : () => Promise < AgentContext >,
) => Promise < AgentContext >;
export function sessionContinuityMiddleware (
sessionService : SessionService ,
) : MiddlewareFn {
return async (ctx, next) => {
try {
const priorMessages = await sessionService. getConversationContext (ctx.sessionId);
ctx.messages = [
... priorMessages. map ((m : { role : string ; content : unknown }) => ({
role: m.role as "user" | "assistant" | "system" ,
content: typeof m.content === "string" ? m.content : JSON. stringify (m.content),
})),
... ctx.messages,
];
} catch {
const newSession = await sessionService. createSession (ctx.userId);
ctx.sessionId = newSession.id;
}
const result = await next ();
if (result.response) {
await sessionService. addMessage (ctx.sessionId, {
role: "assistant" ,
content: result.response,
});
}
return result;
};
}
export function circuitBreakerMiddleware (
cbService : CircuitBreakerService ,
) : MiddlewareFn {
return async (ctx, next) => {
try {
return await cbService. executeWrapped (next);
} catch (err) {
if (err instanceof ServiceUnavailableError ) {
ctx.circuitState = "open" ;
ctx.retryAfterMs = err.retryAfterMs;
ctx.response = "Service temporarily unavailable — circuit breaker is open" ;
return ctx;
}
throw err;
}
};
}
export function xeroQueryMiddleware (
xeroService : XeroService ,
) : MiddlewareFn {
return async (ctx, next) => {
if (ctx.xeroQuery) {
let data : unknown [] = [];
if (ctx.xeroQuery. toLowerCase (). includes ( "invoice" )) {
data = await xeroService. getInvoices ();
} else if (ctx.xeroQuery. toLowerCase (). includes ( "bank" ) || ctx.xeroQuery. toLowerCase (). includes ( "transaction" )) {
data = await xeroService. getBankTransactions ();
}
ctx.xeroData = data;
ctx.messages. push ({
role: "system" ,
content: `Xero data: ${ JSON . stringify ( data ) }` ,
});
}
return next ();
};
}
export function cohereLlmMiddleware (
llmClient : LLMClient ,
) : MiddlewareFn {
return async (ctx, next) => {
const response = await llmClient. chat (ctx.messages);
ctx.response = response;
return next ();
};
}
export function pipeline (
ctx : AgentContext ,
middlewares : MiddlewareFn [],
) : Promise < AgentContext > {
let index = - 1 ;
const runner = async (i : number ) : Promise < AgentContext > => {
if (i <= index) throw new Error ( "next() called multiple times" );
index = i;
if (i >= middlewares.length) return ctx;
const middleware = middlewares[i];
if ( ! middleware) return ctx;
return middleware (ctx, () => runner (i + 1 ));
};
return runner ( 0 );
}
export class AgentOrchestrator {
constructor (
private readonly sessionService : SessionService ,
private readonly cbService : CircuitBreakerService ,
private readonly xeroService : XeroService ,
private readonly llmClient : LLMClient ,
) {}
async processQuery (params : {
sessionId : string ;
userId : string ;
xeroQuery ?: string ;
query : string ;
}) : Promise <{
response : string ;
sessionId : string ;
circuitState : string ;
retryAfterMs : number | undefined ;
}> {
try {
const ctx : AgentContext = {
sessionId: params.sessionId,
userId: params.userId,
messages: [{ role: "user" , content: params.query }],
xeroQuery: params.xeroQuery,
retryAfterMs: undefined ,
};
const middlewares : MiddlewareFn [] = [
sessionContinuityMiddleware (this.sessionService),
circuitBreakerMiddleware (this.cbService),
xeroQueryMiddleware (this.xeroService),
cohereLlmMiddleware (this.llmClient),
];
const result = await pipeline (ctx, middlewares);
return {
response: result.response ?? "No response generated" ,
sessionId: result.sessionId,
circuitState: result.circuitState ?? "closed" ,
retryAfterMs: result.retryAfterMs,
};
} catch (err) {
if (err instanceof ServiceUnavailableError ) {
return {
response: "Service temporarily unavailable — circuit breaker is open" ,
sessionId: params.sessionId,
circuitState: "open" ,
retryAfterMs: err.retryAfterMs,
};
}
throw new AgentPipelineError ( "Agent pipeline failed" , {
stage: "orchestrator" ,
cause: err instanceof Error ? err : new Error ( String (err)),
});
}
}
}
number
; state
?:
string
};
sessionStats ?: { activeSessionCount ?: number ; totalMessageCount ?: number };
}
export type RunbookGenHandlerOptions = Pick < RunbookGenPayload , "xeroStats" | "circuitBreakerStats" | "sessionStats" >;
export async function runbookGenHandler (
payload : RunbookGenPayload ,
) : Promise < Runbook > {
const circuitBreakerOpen = payload.circuitBreakerStats?.state === "open" ;
const cbFailureCount = payload.circuitBreakerStats?.failureCount ?? 0 ;
const context : AnalysisContext = {
serviceDefinition: { name: payload.orgId },
repositoryAnalysis: {
serviceType: "web-api" ,
language: "typescript" ,
framework: "express" ,
structure: {
mainDirectories: [ "src" , "tests" ],
fileCount: 15 ,
depth: 3 ,
hasTests: true ,
hasDockerfile: false ,
hasKubernetesManifests: false ,
hasTerraform: false ,
},
configFiles: [ "package.json" , "tsconfig.json" ],
entryPoints: [
{ file: "src/index.ts" , type: "http_server" },
{ file: "app/api" , type: "http_server" },
],
externalServices: [
{ name: "xero" , type: "api" },
{ name: "cohere" , type: "api" },
],
},
dependencyAnalysis: {
directDeps: [
{ name: "@reaatech/session-continuity" , purpose: "session management" , category: "utility" },
{ name: "@reaatech/circuit-breaker-agents" , purpose: "resilience" , category: "utility" },
{ name: "xero-node" , purpose: "xero api client" , category: "utility" },
{ name: "cohere-ai" , purpose: "llm client" , category: "utility" },
],
transitiveDeps: [],
dependencyGraph: [],
externalServices: [
{ name: "xero-api" , type: "api" },
{ name: "cohere-api" , type: "api" },
],
},
deploymentPlatform: "cloud-run" ,
monitoringPlatform: "unknown" ,
externalServices: [
{ name: "xero" , type: "api" },
{ name: "cohere" , type: "api" },
{ name: "trigger.dev" , type: "api" },
],
failureModes: circuitBreakerOpen
? [{
id: generateId ( "fm" ),
name: "Xero Circuit Breaker Open" ,
description: String (cbFailureCount) + " failures tripped Xero circuit breaker" ,
category: "external" ,
severity: "high" ,
likelihood: "medium" ,
detection: [ "circuit-breaker" ],
mitigation: [ "Check Xero API status" , "Verify credentials" ],
escalation: "Notify the team" ,
}]
: [],
configParser: {
xeroStats: payload.xeroStats,
circuitBreakerStats: payload.circuitBreakerStats,
sessionStats: payload.sessionStats,
},
};
return generateRunbook (context);
}
export const runbookGenTask = task ({
id: "runbook-gen" ,
run: runbookGenHandler,
});
export async function triggerRunbookGeneration (
orgId : string ,
scope ?: string [],
) : Promise < string > {
const runId = generateId ( "rb" );
try {
await runbookGenTask. trigger ({ orgId, ... (scope ? { analysisScope: scope } : {}) });
} catch {
// In test environments or when trigger.dev is not configured, silently continue
}
return runId;
}
export async function generateRunbook (
context : AnalysisContext ,
) : Promise < Runbook > {
const apiKey = process.env.ANTHROPIC_API_KEY;
if ( ! apiKey) {
throw new ConfigurationError ( "ANTHROPIC_API_KEY is required for runbook generation agent" );
}
const config : AgentConfig = {
provider: "claude" ,
model: "claude-haiku-4-5-20251001" ,
apiKey,
temperature: 0.2 ,
};
const agent : AnalysisAgent = createAnalysisAgent (config);
await agent. analyzeRepository (context);
await agent. identifyFailureModes (context);
const alertsSection = await agent. generateRunbookSection ( "alerts" , context);
const rollbackSection = await agent. generateRunbookSection ( "rollback" , context);
const runbook = {
id: generateId ( "rb" ),
title: "Xero Reliability Runbook" ,
serviceName: "xero-accounting" ,
version: "1.0.0" ,
generatedAt: new Date (). toISOString (),
sections: [
{ title: "Alerts" , content: alertsSection, id: generateId ( "sec" ), order: 1 , subsections: [] },
{ title: "Rollback" , content: rollbackSection, id: generateId ( "sec" ), order: 2 , subsections: [] },
],
} satisfies Runbook ;
return runbook;
}