Databricks Code Sandbox for Secure SMB Data Analysis
An AI agent that translates natural language into safe SQL and Python queries, runs them on Databricks, and returns results with cost tracking and guardrails.
Small businesses with data in Databricks need ad‑hoc reports and analyses, but hiring a data engineer for every query isn’t feasible. Non‑technical staff often write inefficient or unsafe code, risking runaway costs.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This tutorial walks you through building the Databricks Code Sandbox — a Next.js application that lets users ask analytical questions in plain English and get safe SQL or Python results from their Databricks warehouse. An AI agent classifies intent, generates code via LLM, repairs malformed output, dry-runs the code in an E2B sandbox, enforces security policies through a firewall, tracks costs, and executes the approved query on Databricks — all with session continuity for multi-turn conversations.
You’ll use five REAA packages (confidence-router, structured-repair-core, tool-use-firewall-core, llm-cost-telemetry, session-continuity), the OpenAI SDK for code generation, the E2B sandbox for isolated dry-runs, and the Databricks SDK for query execution. The final app exposes three API routes (POST /api/analyze, POST+GET /api/session, GET /api/budget) and a single-page frontend.
Prerequisites
Node.js >= 22 and pnpm >= 9
A Databricks workspace with a SQL warehouse — the DATABRICKS_HOST, DATABRICKS_TOKEN, and DATABRICKS_WAREHOUSE_ID env vars
An OpenAI API key (or compatible provider via custom baseURL)
An E2B API key for the sandbox environment
A Langfuse account (optional — the app runs without it)
Familiarity with TypeScript, Next.js App Router, and async/await patterns
Step 1: Bootstrap the project and configure environment
Start by creating a Next.js project with the App Router:
Now create the .env.example with every variable your services will read:
env
# Env vars used by databricks-code-sandbox-for-secure-smb-data-analysis.# Keep placeholders only — never commit real values.NODE_ENV=development# DatabricksDATABRICKS_HOST=<your-databricks-host>DATABRICKS_TOKEN=<your-databricks-token>DATABRICKS_WAREHOUSE_ID=<your-warehouse-id># E2B SandboxE2B_API_KEY=<your-e2b-api-key>SANDBOX_TIMEOUT_MS=30000# LLM (OpenAI SDK — also works with DeepSeek via custom baseURL)OPENAI_API_KEY=<your-openai-key>OPENAI_MODEL=gpt-5.2-mini# Langfuse ObservabilityLANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>LANGFUSE_HOST=<your-langfuse-host># Confidence RouterROUTE_THRESHOLD=0.8FALLBACK_THRESHOLD=0.3# Session & BudgetSESSION_TTL_SECONDS=3600MAX_BUDGET_USD=5.00
Copy it to .env.local and fill in your real credentials:
terminal
cp .env.example .env.local
Expected output: A Next.js project with all dependencies installed and a populated .env.local.
Step 2: Define shared types
Create src/lib/types.ts — these types flow through every layer of the pipeline:
Expected output: A 40-line file with all the type aliases and interfaces the application shares across modules.
Step 3: Build the intent classifier and LLM generator
Start with the LLM client in src/services/llm.ts. This creates an OpenAI client (with optional baseURL override) and generates code based on the classified intent:
ts
import OpenAI from "openai";import type { CodeGenInput } from "../lib/types.js";export function createLlmClient(baseURL?: string): OpenAI { const apiKey = process.env.OPENAI_API_KEY; if (!apiKey) { throw new Error("OPENAI_API_KEY not set"); } const config: { apiKey: string; baseURL?: string } = { apiKey }; if (baseURL) { config.baseURL = baseURL; } return new OpenAI(config);}export function getModelName(): string { return process.env.OPENAI_MODEL ?? "gpt-5.2-mini";}export class LlmError extends Error { status: number; step: string; constructor(message: string, status: number, step: string) { super(message); this.name = "LlmError"; this.status = status; this.step = step; }}export async function generateCode( input: CodeGenInput, baseURL?: string,): Promise<{ code: string; inputTokens: number; outputTokens: number }> { const apiKey = process.env.OPENAI_API_KEY; if (!apiKey) { throw new Error("OPENAI_API_KEY not set"); } const config: { apiKey: string; baseURL?: string } = { apiKey }; if (baseURL) { config.baseURL = baseURL; } const client = new OpenAI(config); const model = getModelName(); const systemPrompt = input.intent === "sql" ? "You are a SQL expert. Return only valid SQL code without markdown fences or explanation." : "You are a Python expert. Return only valid Python code without markdown fences or explanation."; try { const completion = await client.chat.completions.create({ model, messages: [ { role: "system", content: systemPrompt }, { role: "user", content: input.query }, ], temperature: 0.1, }); const code = completion.choices[0]?.message?.content ?? ""; const inputTokens = completion.usage?.prompt_tokens ?? 0; const outputTokens = completion.usage?.completion_tokens ?? 0; return { code, inputTokens, outputTokens }; } catch (err: unknown) { const status = typeof err === "object" && err !== null && "status" in err ? (err as { status: number }).status : 500; const message = err instanceof Error ? err.message : "OpenAI API error"; throw new LlmError(message, status, "generate"); }}
Now create the classifier in src/services/classifier.ts. It wraps @reaatech/confidence-router with a ConfidenceRouter that registers two KeywordClassifier instances — one for SQL keywords, one for Python keywords:
ts
import { ConfidenceRouter, KeywordClassifier,} from "@reaatech/confidence-router";import type { QueryIntent } from "../lib/types.js";export function createClassifier(config?: { routeThreshold?: number; fallbackThreshold?: number;}): ConfidenceRouter { const rt = config?.routeThreshold ?? Number(process.env.ROUTE_THRESHOLD || "0.8"); const ft = config?.fallbackThreshold ?? Number(process.env.FALLBACK_THRESHOLD || "0.3"); const router = new ConfidenceRouter({ routeThreshold: rt, fallbackThreshold: ft, }); const sqlClassifier = new KeywordClassifier([ { label: "sql", keywords: ["SELECT", "FROM", "WHERE", "JOIN", "GROUP BY"] }, ]); const pythonClassifier = new KeywordClassifier([ { label: "python", keywords: ["import", "def", "print", "pandas", "matplotlib"], }, ]); router.registerClassifier(sqlClassifier); router.registerClassifier(pythonClassifier); return router;}export async function classifyQuery( router: ConfidenceRouter, query: string,): Promise<QueryIntent> { const decision = await router.process(query); if (decision.type === "ROUTE") { return decision.target as QueryIntent; } if (decision.type === "CLARIFY") { return "sql"; } return "python";}
Expected output: Two service files. The LLM service wraps the OpenAI chat-completions API and returns the generated code string plus token counts. The classifier inspects the user query for SQL or Python keywords and returns a "sql" or "python" intent.
Step 4: Create the code repair and sandbox services
The repair service (src/services/code-repair.ts) uses @reaatech/structured-repair-core to parse and fix LLM output into well-structured code:
The sandbox service (src/services/sandbox.ts) runs the repaired code in an isolated E2B sandbox — a dry-run that catches syntax errors and runtime problems before execution hits Databricks:
ts
import Sandbox from "e2b";import type { SandboxResult } from "../lib/types.js";function escape(s: string): string { return s.replace(/\\/g, "\\\\").replace(/"/g, '\\"');}export async function runInSandbox( code: string, language: "python" | "sql", timeoutMs?: number,): Promise<SandboxResult> { if (!process.env.E2B_API_KEY) { throw new Error("E2B_API_KEY not set"); } const effectiveTimeout = timeoutMs ?? (Number(process.env.SANDBOX_TIMEOUT_MS) || 30000); const controller = new AbortController(); const timer = setTimeout(() => { controller.abort(); }, effectiveTimeout); const sandbox = await Sandbox.create(); try { let result: { stdout: string; stderr: string; exitCode?: number }; if (language === "python") { result = await sandbox.commands.run( `python3 -c "${escape(code)}"`, ); } else { result = await sandbox.commands.run( `echo "${escape(code)}" | sqlite3 :memory:`, ); } return { success: result.exitCode === 0, stdout: result.stdout, stderr: result.stderr, exitCode: result.exitCode ?? 0, }; } finally { clearTimeout(timer); void sandbox.kill(); }}
Expected output: The repair service can take raw LLM output (with markdown fences or JSON) and cleanly extract structured code. The sandbox service creates an E2B sandbox, runs the code, and always calls sandbox.kill() in a finally block.
Step 5: Build the firewall and Databricks services
The firewall service (src/services/firewall.ts) uses @reaatech/tool-use-firewall-core to block destructive operations. It registers SQL patterns (DROP TABLE, DELETE FROM, ALTER TABLE, TRUNCATE) and Python patterns (os.system, subprocess, shutil.rmtree, __import__), and also enforces a per-session MAX_BUDGET_USD cap:
The Databricks service (src/services/databricks.ts) creates a WorkspaceClient from the SDK and exposes two execution paths — SQL queries via warehouse and Python one-off jobs:
ts
import { WorkspaceClient, Config, ApiError, Time, TimeUnits } from "@databricks/sdk-experimental";export interface ExecuteSQLInput { warehouseId: string; sql: string;}export function createDatabricksClient(): WorkspaceClient { const host = process.env.DATABRICKS_HOST; const token = process.env.DATABRICKS_TOKEN; if (!host || !token) { throw new Error( "DATABRICKS_HOST and DATABRICKS_TOKEN must be set in environment", ); } const config = new Config({ host, token }); return new WorkspaceClient(config);}function generateRunName(): string { return `query-${String(Date.now())}`;}export async function executeSQL( client: WorkspaceClient, input: ExecuteSQLInput,): Promise<{ columns: string[]; rows: unknown[][]; _rawOutput: string }> { try { const waiter = await client.jobs.submit({ run_name: generateRunName(), tasks: [ { task_key: "sql-exec", sql_task: { warehouse_id: input.warehouseId, query: { query_id: "0" }, }, }, ], }); const run = await waiter.wait({ timeout: new Time(5, TimeUnits.minutes) }); const resultOutput = JSON.stringify(run); return { columns: [], rows: [], _rawOutput: resultOutput, }; } catch (err: unknown) { if (err instanceof ApiError) { throw new Error( `Databricks API error (${String(err.statusCode)}): ${err.message}`, ); } throw err; }}export async function executePythonJob( client: WorkspaceClient, code: string,): Promise<unknown> { const encoded = Buffer.from(code).toString("base64"); const result = await client.jobs.submit({ run_name: "sandbox-execution", tasks: [ { task_key: "main", spark_python_task: { python_file: `/tmp/${encoded}`, }, }, ], }); return result;}
Expected output: The firewall throws PolicyViolationError for dangerous SQL/Python patterns and BudgetExceededError when costs exceed the configured limit. The Databricks client connects to your workspace and submits SQL queries or Python jobs.
Step 6: Implement cost telemetry, session management, and observability
The cost telemetry service (src/services/cost-telemetry.ts) builds cost spans from LLM token usage using @reaatech/llm-cost-telemetry:
The session service (src/services/session.ts) implements IStorageAdapter with an in-memory Map backend and creates a SessionManager with token-budget-based compression:
ts
import { SessionManager, type IStorageAdapter, type Session, type Message, type SessionId, type MessageId, type TokenCounter, type SessionFilters, type MessageQueryOptions, type UpdateSessionOptions, type HealthStatus, ConcurrencyError, SessionNotFoundError, TokenBudgetExceededError,} from "@reaatech/session-continuity";export { TokenBudgetExceededError };export class SimpleTokenCounter implements TokenCounter { readonly model = "simple"; readonly tokenizer = "simple"
The observability service (src/services/telemetry.ts) wraps Langfuse for tracing. It gracefully returns null when credentials are absent:
Expected output: Three service files. Cost telemetry produces validated CostSpan objects with per-token pricing. Session management provides a full IStorageAdapter implementation with optimistic concurrency. Langfuse tracing is optional and no-ops when unconfigured.
Step 7: Wire the pipeline and API routes
The analysis pipeline orchestrator (src/api/analyze.ts) ties all services together. It exposes a runAnalysis function that runs classify, generate, repair, sandbox, firewall, execute, track, and persist — in order:
ts
import type { AnalysisRequest, AnalysisResult, FlowStep, QueryIntent } from "../lib/types.js";import { createClassifier, classifyQuery } from "../services/classifier.js";import { generateCode, getModelName } from "../services/llm.js";import { repairGeneratedCode } from "../services/code-repair.js";import { runInSandbox } from "../services/sandbox.js";import { CodeFirewall, checkCodeApproved } from "../services/firewall.js";import { createDatabricksClient, executeSQL, executePythonJob } from "../services/databricks.js";import { createCostSpan } from "../services/cost-telemetry.js";import { createSessionManager, addAnalysisTurn } from "../services/session.js";import { createLangfuseClient, traceOperation } from "../services/telemetry.js"
Now create the three Next.js App Router route handlers.
app/api/analyze/route.ts:
ts
import { type NextRequest, NextResponse } from "next/server";import { runAnalysis, createServices } from "../../../src/api/analyze.js";import type { AnalysisRequest } from "../../../src/lib/types.js";const services = createServices();export async function POST(req: NextRequest) { try { let body: AnalysisRequest; try { body = (await req.json()) as AnalysisRequest; } catch { return NextResponse.json({ error: "query required" }, { status: 400 }); } if ( typeof body.query !== "string" || body.query.trim().length === 0 ) { return NextResponse.json({ error: "query required" }, { status: 400 }); } const result = await runAnalysis(body, services); return NextResponse.json(result); } catch { return NextResponse.json( { error: "pipeline failure", step: "unknown" }, { status: 500 }, ); }}
import { type NextRequest, NextResponse } from "next/server";import { getSessionBudgetStatus } from "../../../src/services/cost-telemetry.js";export function GET(req: NextRequest) { const sessionId = req.nextUrl.searchParams.get("sessionId"); const status = getSessionBudgetStatus(sessionId ?? "default"); return NextResponse.json(status);}
Expected output: One orchestrator and three route handlers. The orchestrator runs all eight pipeline steps and gracefully returns partial results on any failure. The routes validate input, return proper HTTP status codes, and handle session-not-found errors.
Step 8: Build the frontend
Replace the scaffolded app/page.tsx with a chat-style single-page interface. It sends user queries to /api/analyze, displays the results, and loads session history on mount:
Update app/layout.tsx with the proper title, viewport settings, and global styles import:
tsx
import type { Metadata } from "next";import { Geist, Geist_Mono } from "next/font/google";import "./globals.css";const geistSans = Geist({ variable: "--font-geist-sans", subsets: ["latin"],});const geistMono = Geist_Mono({ variable: "--font-geist-mono", subsets: ["latin"],});export const viewport = { width: "device-width", initialScale: 1 };export const metadata: Metadata = { title: "Databricks Code Sandbox", description: "Safe natural-language analytics on Databricks — AI agent with sandbox, firewall, and cost controls.",};export default function RootLayout({ children }: { children: React.ReactNode }) { return ( <html lang="en" className={`${geistSans.variable} ${geistMono.variable}`}> <body>{children}</body> </html> );}
Expected output: A single chat-style page that accepts natural language queries, calls the API, and renders intent, generated code, sandbox output, Databricks results, cost, and collapsible session history.
Step 9: Write and run tests
The test suite uses vitest with MSW for HTTP mocking. Create tests/setup.ts to configure the mock server:
pnpm vitest run --coverage --reporter=json --outputFile=vitest-report.json
Expected output: All tests pass with exit code 0 — at least 60 total tests across 10 test files, with coverage thresholds above 90% for lines, branches, functions, and statements on runtime code.
Step 10: Type-check and lint
Run the quality gates to verify everything is wired correctly:
terminal
pnpm typecheckpnpm lint
Expected output: TypeScript compiles with zero errors. ESLint passes with no banned patterns.
Next steps
Add a persistent database — replace InMemorySessionAdapter with a Postgres or Redis-backed storage adapter for production usage
Integrate more data sources — extend executeSQL to support Snowflake, BigQuery, or Redshift via the Databricks federation feature
Improve the repair loop — when the sandbox returns a non-zero exit code, feed the error back into the LLM for iterative self-correction
Add user authentication — protect the API routes with NextAuth and tie budget limits to authenticated users instead of anonymous sessions
;
count(text: string): number {
return Math.ceil(text.length / 4);
}
countMessages(messages: Message[]): number {
return messages.reduce((sum, msg) => {
const content =
typeof msg.content === "string"
? msg.content
: JSON.stringify(msg.content);
return sum + Math.ceil(content.length / 4) + 4;
}, 0);
}
}
export class InMemorySessionAdapter implements IStorageAdapter {
private sessions = new Map<SessionId, Session>();
private messages = new Map<SessionId, Message[]>();