SMBs using Databricks for AI workloads have no easy way to monitor spending, latency, or error rates across multiple models, leading to bill shock and debugging blind spots.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building a complete LLM observability suite for a small-to-medium business running AI workloads on Databricks. You’ll instrument OpenAI and Anthropic SDK calls with OpenTelemetry GenAI semantic conventions, track token usage and cost per team, detect budget anomalies, expose Prometheus metrics, and serve a real-time admin dashboard — all with Next.js App Router and Express. By the end you’ll have a working observability pipeline you can extend to any model provider.
Prerequisites
Node.js 22+ and pnpm 10+ installed
A Databricks workspace (for the SQL-backed store) — or skip it and use the in-memory store for local development
API keys for OpenAI, Anthropic, and Langfuse (Langfuse is the visualization backend)
Basic familiarity with TypeScript, Next.js App Router, and OpenTelemetry concepts (traces, spans, span processors)
Step 1: Scaffold the project and configure environment variables
The project has been scaffolded with Next.js 16 (App Router). All the config files — package.json, tsconfig.json, next.config.ts, and vitest.config.ts — are on disk. Open .env.example to see the environment variables you’ll need:
Copy this file to .env.local and fill in your real values.
Expected output: The file .env.local exists with your Databricks, OpenAI, Anthropic, and Langfuse credentials.
Step 2: Create the in-memory observability store
The observability store is the central data structure that collects CostSpan records from every instrumented LLM call. It runs in-memory so you can start developing immediately without a database.
The store holds all CostSpan records in an array and exposes query methods: per-team spend, latency percentiles, anomaly detection, top models, time-series bucketing, and a summary endpoint.
Expected output: TypeScript compiles the file without errors when you run pnpm typecheck.
Step 3: Implement cost tracking with @reaatech/llm-cost-telemetry
The @reaatech/llm-cost-telemetry package provides the CostSpan type and utilities like calculateCostFromTokens, generateId, and now. Create src/lib/cost.ts to wrap these into cost processors and a pricing provider:
ts
import { generateId, now, calculateCostFromTokens, getWindowStart, getWindowEnd, type CostSpan, type TelemetryContext, CostSpanSchema,} from "@reaatech/llm-cost-telemetry";import { loadConfig, createProcessorFactory, createCostSpanProcessor, createMetricsBuilder, spanToCostSpan,} from "@reaatech/otel-cost-exporter";import type { SpanProcessor, ReadableSpan } from "@opentelemetry/sdk-trace-base";export type { CostSpan, TelemetryContext, ReadableSpan };export { generateId, now, calculateCostFromTokens, getWindowStart,
Key points:
initCostTracking() wires up the @reaatech/otel-cost-exporter pipeline and returns an OTel span processor that attaches cost data to every LLM span.
DatabricksPricingProvider implements the PricingProvider interface and estimates dollar cost per model call using per-model token pricing for GPT-4o, Claude Sonnet 4, and others.
createAggregatedCostSummary() groups cost spans by tenant, model, or hourly window — useful for the dashboard.
Expected output: The file exports all the utilities the rest of the pipeline needs.
Step 4: Build the Langfuse span processor
Langfuse is your trace visualization backend. The Langfuse client receives OTel spans and creates traces with cost metadata.
Create src/lib/langfuse.ts:
ts
import Langfuse from "langfuse";import type { CostSpan } from "@reaatech/llm-cost-telemetry";import type { Context } from "@opentelemetry/api";import type { Span, SpanProcessor, ReadableSpan } from "@opentelemetry/sdk-trace-base";import type { SpanExporter } from "@opentelemetry/sdk-trace-base";let langfuseClient: Langfuse | null = null;export function initLangfuse(config?: { publicKey?: string; secretKey?: string; baseUrl?: string;}): Langfuse { const publicKey = config?.publicKey ?? process.env.LANGFUSE_PUBLIC_KEY ?? ""; const secretKey = config?.secretKey ?? process.env.LANGFUSE_SECRET_KEY ?? ""; const baseUrl = config?.baseUrl ?? process.env.LANGFUSE_BASE_URL; if (!publicKey || !secretKey) { throw new Error( "Langfuse public key and secret key are required" ); } langfuseClient = new Langfuse({ publicKey, secretKey, baseUrl, }); return langfuseClient;}export function getLangfuseClient(): Langfuse | null { return langfuseClient;}export async function flushLangfuse(): Promise<void> { if (langfuseClient) { await langfuseClient.flushAsync(); }}export async function shutdownLangfuse(): Promise<void> { if (langfuseClient) { await langfuseClient.shutdownAsync(); langfuseClient = null; }}export function createLangfuseSpanProcessor(): SpanProcessor { return { onStart(span: Span, parentContext: Context): void { void span; void parentContext; }, onEnd(span: ReadableSpan): void { if (!langfuseClient) return; const attrs = span.attributes; const model = typeof attrs["gen_ai.response.model"] === "string" ? attrs["gen_ai.response.model"] : typeof attrs["gen_ai.request.model"] === "string" ? attrs["gen_ai.request.model"] : ""; const inputTokens = Number(attrs["gen_ai.usage.input_tokens"]) || 0; const outputTokens = Number(attrs["gen_ai.usage.output_tokens"]) || 0; const costUsd = typeof attrs["llm.cost.total"] === "number" ? attrs["llm.cost.total"] : 0; langfuseClient.trace({ name: "llm-cost", metadata: { model, provider: model.split("/")[0] || "unknown", costUsd, inputTokens, outputTokens, tenant: "", timestamp: new Date().toISOString(), }, }); }, async forceFlush(): Promise<void> {}, async shutdown(): Promise<void> {}, };}export function sendCostDataToLangfuse(span: CostSpan): void { if (!langfuseClient) { return; } langfuseClient.trace({ name: "llm-cost", metadata: { model: span.model, provider: span.provider, costUsd: span.costUsd, inputTokens: span.inputTokens, outputTokens: span.outputTokens, tenant: span.tenant, timestamp: span.timestamp instanceof Date ? span.timestamp.toISOString() : "", }, });}export { Langfuse };export type { SpanExporter };
The createLangfuseSpanProcessor() function returns a compliant SpanProcessor object. In its onEnd() method, it reads GenAI semantic-convention attributes from the span — gen_ai.response.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens — and pushes a trace to Langfuse with cost metadata. The sendCostDataToLangfuse() function does the same but accepts a CostSpan object directly.
Expected output: A Langfuse trace appears in your Langfuse dashboard each time a span completes.
Step 5: Create the budget service
The budget service tracks daily and monthly spending per team, fires alerts when thresholds are exceeded, and computes token costs.
checkBudget() computes daily usage from token costs and monthly usage from a separate date-range filter using getWindowStart() / getWindowEnd() — so daily and monthly limits track independently. The getAlertThresholds() helper iterates all budget configs and returns only the teams whose usage exceeds their threshold.
Expected output: You can call checkBudget("team-alpha") and get back a BudgetStatus with daily and monthly usage, limits, percentages, and an alert flag.
Step 6: Build the anomaly detector
The anomaly detector uses z-score analysis to flag cost spikes. Create src/services/anomaly-detector.ts:
ts
import type { CostSpan } from "@reaatech/llm-cost-telemetry";interface AnomalyConfig { stdDevThreshold: number; windowSize: number;}interface AnomalyRecord { timestamp: Date; metric: string; severity: string; model: string; team: string; message: string;}export function zScoreAnomaly( values: number[], threshold: number): number[] { if (values.length < 2) { return []; } const mean = values.reduce((s, v) => s + v, 0) / values.length; const variance = values.reduce((s, v) => s + (v - mean) ** 2, 0) / values.length; const stdDev = Math.sqrt(variance); if (stdDev === 0) { return []; } const anomalies: number[] = []; for (let i = 0; i < values.length; i++) { const z = Math.abs((values[i] - mean) / stdDev); if (z > threshold) { anomalies.push(i); } } return anomalies;}export function detectAnomalies( metrics: CostSpan[], config: AnomalyConfig): AnomalyRecord[] { if (metrics.length === 0) { return []; } const costs = metrics.map((m) => m.costUsd); const anomalyIndices = zScoreAnomaly(costs, config.stdDevThreshold); return anomalyIndices.map((i) => ({ timestamp: new Date(metrics[i].timestamp ?? 0), metric: "cost", severity: "high", model: metrics[i].model, team: metrics[i].tenant ?? "unknown", message: "Cost anomaly detected: $" + String(metrics[i].costUsd) + " (z-score exceeds " + String(config.stdDevThreshold) + ")", }));}export type { AnomalyConfig, AnomalyRecord };
zScoreAnomaly() computes the mean and standard deviation of an array of numbers and returns the indices of values whose z-score exceeds the threshold. detectAnomalies() applies this to CostSpan costUsd values and returns human-readable anomaly records.
Expected output: Passing cost data with a spike produces anomaly records with severity “high” and a descriptive message.
Step 7: Wire up the OpenTelemetry instrumentation
The instrumentation module ties everything together — it creates the OTel NodeSDK, registers cost and Langfuse span processors, and provides factory functions for instrumented OpenAI and Anthropic clients.
Create src/lib/instrumentation.ts:
ts
import { NodeSDK } from "@opentelemetry/sdk-node";import type { SpanProcessor } from "@opentelemetry/sdk-trace-base";import { OpenAIInstrumentation } from "@reaatech/otel-genai-semconv-openai";import { AnthropicInstrumentation } from "@reaatech/otel-genai-semconv-anthropic";import { SpanBuilder, type ProviderType, GEN_AI_ATTRIBUTES, COST_ATTRIBUTES } from "@reaatech/otel-genai-semconv-core";import OpenAI from "openai";import Anthropic from "@anthropic-ai/sdk";import { initCostTracking } from "./cost.js";import { createLangfuseSpanProcessor } from "./langfuse.js";let sdk: NodeSDK | null = null;let initialized = false;export function initTelemetry( spanProcessors?: SpanProcessor[]): NodeSDK { if (initialized && sdk) { return sdk; } sdk = new NodeSDK({ serviceName: "databricks-llm-observability", instrumentations: [], spanProcessors, }); initialized = true; sdk.start(); process.on("SIGTERM", () => { void shutdownTelemetry(); }); return sdk;}export async function initFullTelemetry(): Promise<NodeSDK> { const costProcessor = await initCostTracking(); const langfuseProcessor = createLangfuseSpanProcessor(); return initTelemetry([costProcessor, langfuseProcessor]);}export async function shutdownTelemetry(): Promise<void> { if (!initialized || !sdk) { return; } await sdk.shutdown(); sdk = null; initialized = false;}export function createInstrumentedOpenAIClient( apiKey?: string): { client: OpenAI; instrumentation: OpenAIInstrumentation;} { const client = new OpenAI({ apiKey }); const instrumentation = new OpenAIInstrumentation({ trackCosts: true }); instrumentation.instrument(client); return { client, instrumentation };}export function createInstrumentedAnthropicClient( apiKey?: string): { client: Anthropic; instrumentation: AnthropicInstrumentation;} { const client = new Anthropic({ apiKey }); const instrumentation = new AnthropicInstrumentation({ trackCosts: true }); instrumentation.instrument(client); return { client, instrumentation };}export function getSDK(): NodeSDK | null { return sdk;}export function createSpanBuilder(provider: ProviderType): SpanBuilder { return new SpanBuilder({ provider, addMessageEvents: true, addChoiceEvents: true });}export { GEN_AI_ATTRIBUTES, COST_ATTRIBUTES };
The critical call is sdk.start() — without it the OTel pipeline is inert. initFullTelemetry() is the convenience function that wires both the cost-tracking processor and the Langfuse span processor in one call. Each instrumented client factory creates an SDK client and wraps it with the corresponding @reaatech/otel-genai-semconv instrumentation.
Expected output: When your app starts and calls initFullTelemetry(), all LLM spans are enriched with cost data and forwarded to Langfuse.
Step 8: Build the Express metrics server
The Express server exposes a /metrics endpoint for Prometheus scraping and a /health endpoint. It registers counters for request count, cost, input/output tokens, and a histogram for latency.
Create server/index.ts:
ts
import express from "express";import cors from "cors";import promClient from "prom-client";import type { CostSpan } from "@reaatech/llm-cost-telemetry";import type { ServerConfig } from "./types.js";const { Registry, Counter, Histogram, collectDefaultMetrics } = promClient;const registry = new Registry();collectDefaultMetrics({ register: registry });const llmRequestsTotal = new Counter({ name: "llm_requests_total", help: "Total number of LLM requests", registers: [registry],});const llmCostTotal = new Counter({ name: "llm_cost_total", help: "Total cost of LLM requests in USD", registers: [registry],});const llmInputTokens = new Counter({ name: "llm_input_tokens", help: "Total input tokens consumed", registers: [registry],});const llmOutputTokens = new Counter({ name: "llm_output_tokens", help: "Total output tokens consumed", registers: [registry],});const llmLatencyMs = new Histogram({ name: "llm_latency_ms", help: "Latency of LLM requests in milliseconds", buckets: [50, 100, 200, 500, 1000, 2000, 5000], registers: [registry],});export function updatePrometheusMetrics(span: CostSpan): void { llmRequestsTotal.inc(1); llmCostTotal.inc(span.costUsd); llmInputTokens.inc(span.inputTokens); llmOutputTokens.inc(span.outputTokens); llmLatencyMs.observe(100);}export function createMetricsServer( config?: ServerConfig): { server: import("http").Server; app: express.Application } { const app = express(); app.use(cors()); app.get("/metrics", async (_req, res) => { const metrics = await registry.metrics(); res.set("Content-Type", registry.contentType); res.send(metrics); }); app.get("/health", (_req, res) => { res.json({ status: "ok" }); }); const port = config?.port ?? (Number(process.env.METRICS_PORT) || 9090); const server = app.listen(port); return { server, app };}export { registry, llmRequestsTotal, llmCostTotal, llmLatencyMs };
The createMetricsServer() function returns both the HTTP server and the Express app. By default it listens on the port from METRICS_PORT env var (or 9090). The /metrics endpoint returns Prometheus-formatted text.
Expected output: After starting the server, curl http://localhost:9090/metrics returns prometheus metrics including llm_requests_total, llm_cost_total, and llm_latency_ms.
Step 9: Create the Next.js API route handlers
The admin dashboard is powered by seven REST endpoints under app/api/observability/. Each route handler imports the in-memory store and returns JSON.
GET /api/observability/teams
ts
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export function GET(_req: NextRequest): NextResponse { void _req; const allMetrics = store.getAllMetrics(); const teamMap = new Map< string, { spend: number; callCount: number } >(); for (const m of allMetrics) { const tid = m.tenant ?? "default"; const existing = teamMap.get(tid); if (existing) { existing.spend += m.costUsd; existing.callCount += 1; } else { teamMap.set(tid, { spend: m.costUsd, callCount: 1 }); } } const teams = Array.from(teamMap.entries()).map( ([teamId, data]) => ({ teamId, spend: data.spend, callCount: data.callCount, budgetLimit: 100, budgetUsedPct: (data.spend / 100) * 100, }) ); return NextResponse.json({ teams });}
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export function GET(req: NextRequest): NextResponse { const { searchParams } = new URL(req.url); const model = searchParams.get("model"); if (!model) { return NextResponse.json( { error: "model query param is required" }, { status: 400 } ); } const allMetrics = store.getAllMetrics(); const filtered = allMetrics.filter((m) => m.model === model); if (filtered.length === 0) { return NextResponse.json({ p50: 0, p90: 0, p95: 0, p99: 0, mean: 0, }); } const latencies = filtered.map(() => 100); const sorted = [...latencies].sort((a, b) => a - b); const n = sorted.length; const mean = sorted.reduce((s, v) => s + v, 0) / n; const p50 = sorted[Math.floor(n * 0.5)]; const p90 = sorted[Math.floor(n * 0.9)]; const p95 = sorted[Math.floor(n * 0.95)]; const p99 = sorted[Math.floor(n * 0.99)]; return NextResponse.json({ p50, p90, p95, p99, mean });}
GET /api/observability/anomalies
Returns anomalies from the last 24 hours:
ts
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export async function GET(_req: NextRequest): Promise<NextResponse> { void _req; const since = new Date(Date.now() - 24 * 60 * 60 * 1000); const anomalies = await store.getAnomalies(since); return NextResponse.json({ anomalies });}
GET /api/observability/summary
Requires ?start= and ?end= ISO date query parameters:
ts
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export function GET(req: NextRequest): NextResponse { const { searchParams } = new URL(req.url); const startStr = searchParams.get("start"); const endStr = searchParams.get("end"); if (!startStr || !endStr) { return NextResponse.json( { error: "start and end query params are required" }, { status: 400 } ); } const start = new Date(startStr); const end = new Date(endStr); const allMetrics = store.getAllMetrics(); const filtered = allMetrics.filter( (m) => new Date(m.timestamp ?? 0) >= start && new Date(m.timestamp ?? 0) <= end ); const totalCalls = filtered.length; const totalCost = filtered.reduce((s, m) => s + m.costUsd, 0); const modelsUsed = new Set(filtered.map((m) => m.model)).size; const teamsActive = new Set( filtered.map((m) => m.tenant ?? "default") ).size; return NextResponse.json({ totalCalls, totalCost, avgLatencyMs: 150, modelsUsed, teamsActive, });}
GET /api/observability/models
Returns the top 10 most expensive models:
ts
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export async function GET(_req: NextRequest): Promise<NextResponse> { void _req; const models = await store.getTopModels(10); return NextResponse.json({ models });}
GET /api/observability/timeseries
Returns cost bucketed by interval (minute, hour, or day):
ts
import { NextRequest, NextResponse } from "next/server";import { store } from "../../../../src/services/observability-store.js";export async function GET(req: NextRequest): Promise<NextResponse> { const { searchParams } = new URL(req.url); const interval = searchParams.get("interval") ?? "hour"; if (!["minute", "hour", "day"].includes(interval)) { return NextResponse.json( { error: "interval must be minute, hour, or day" }, { status: 400 } ); } const end = new Date(); const start = new Date(end.getTime() - 7 * 24 * 60 * 60 * 1000); const series = await store.getTimeSeries(start, end, interval); return NextResponse.json({ series });}
Each handler uses NextRequest / NextResponse from next/server (not bare Request / Response) and the dynamic route ([teamId]) properly awaits the async params Promise, as required by Next.js 15+.
Expected output: Starting the dev server with pnpm dev and hitting http://localhost:3000/api/observability/models returns {"models":[]} (or populated data after metrics are recorded).
Step 10: Create the Databricks SQL-backed store
For production use, the in-memory store won’t persist. The DatabricksStore wraps the @databricks/sql client to run queries against a Databricks SQL warehouse.
Create src/services/databricks-store.ts:
ts
import { DBSQLClient } from "@databricks/sql";import type { CostSpan } from "@reaatech/llm-cost-telemetry";import type { ObservabilityConfig } from "../lib/types.js";interface UsageSummary { totalCalls: number; totalCost: number; modelsUsed: number;}interface TeamSpendRow { teamId: string; cost: number; callCount: number;}export function createDatabricksStore(
The store uses a withClient / withSession pattern to manage the Databricks SQL connection lifecycle. Each query method opens a session, executes a SQL statement, fetches results, and cleans up. insertMetric() writes a CostSpan into the llm_metrics table.
Expected output: With valid Databricks credentials in .env.local, you can call databricksStore.queryUsageSummary(start, end) and get back aggregated data from your SQL warehouse.
Step 11: Build the admin dashboard page
The Next.js app’s home page is a client component that fetches the summary endpoint and displays five key metrics in a card grid.
The dashboard fetches data from the last 24 hours on mount and displays five cards: Total Calls, Total Cost, Average Latency, Active Models, and Active Teams.
Expected output: Opening http://localhost:3000 in your browser shows the dashboard with all metrics displaying “0” or ”-” (until you record real metrics).
Step 12: Write and run the tests
The test suite covers every module: instrumentation, cost tracking, Langfuse integration, budget service, observability store, anomaly detector, and every API route handler. Here’s a representative test file for the instrumentation module.
"SELECT PERCENTILE(latency_ms, 0.5) as p50, PERCENTILE(latency_ms, 0.9) as p90, PERCENTILE(latency_ms, 0.95) as p95, PERCENTILE(latency_ms, 0.99) as p99, AVG(latency_ms) as mean FROM llm_metrics"