Vertex AI Knowledge Agent for BigQuery SMB Data Queries
Index business data from BigQuery into a vector knowledge base and let users ask natural language questions, with budget-aware caching and step-by-step answer generation.
Small businesses store critical data in BigQuery but non-technical staff can't write SQL; they wait days for reports or make decisions with stale information.
A complete, working implementation of this recipe — downloadable as a zip or browsable file by file. Generated by our build pipeline; tested with full coverage before publishing.
This recipe builds a RAG-powered knowledge agent that syncs business data from Google BigQuery into a pgvector knowledge base and answers natural language questions using Vertex AI (Gemini). It includes semantic caching to avoid redundant LLM calls, per-user budget enforcement, session continuity for multi-turn conversations, and confidence routing that escalates low-certainty answers for human review. You’ll use six REAA (Rapid Enterprise AI Architecture) packages and several third-party tools, all wired together in a Next.js 16 App Router project.
Prerequisites
Node.js >= 22 and pnpm 10 — the project uses pnpm workspaces and ES modules
Google Cloud Platform project with BigQuery and Vertex AI API enabled
PostgreSQL instance with the pgvector extension (local or cloud, e.g. postgres://user:***@localhost:5432/knowledge_base)
VoyageAI API key — for generating embeddings (VOYAGE_API_KEY)
OpenAI API key — for semantic cache embeddings (OPENAI_API_KEY)
Langfuse account (free tier works) — for observability (LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY)
Familiarity with TypeScript, Next.js App Router, and basic SQL
Step 1: Scaffold the project and install dependencies
Create a new Next.js project with TypeScript and App Router. The recipe pins Next.js 16 and React 19 — using --use-pnpm with create-next-app ensures the project starts with the right package manager:
Enable the instrumentationHook in next.config.ts — this is required so the Langfuse observability hook fires at startup:
ts
import type { NextConfig } from "next";const nextConfig: NextConfig = { experimental: { instrumentationHook: true, },};export default nextConfig;
Expected output: Your terminal shows pnpm WARN about peer deps but no hard errors. Running pnpm typecheck should exit 0.
Step 2: Configure environment variables with Zod
Create a .env.example with all the environment variables the application reads:
env
# Env vars used by vertex-ai-knowledge-agent-for-bigquery-smb-data-queries.# The builder adds entries here as it wires up each integration.# Keep placeholders only — never commit real values.NODE_ENV=developmentGOOGLE_CLOUD_PROJECT=<your-gcp-project-id>GOOGLE_CLOUD_LOCATION=us-central1GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.jsonVOYAGE_API_KEY=<your-voyage-api-key>OPENAI_API_KEY=<your-openai-api-key>DATABASE_URL=postgres://user:***@localhost:5432/knowledge_baseLANGFUSE_PUBLIC_KEY=<your-langfuse-public-key>LANGFUSE_SECRET_KEY=<your-langfuse-secret-key>LANGFUSE_HOST=https://cloud.langfuse.comBIGQUERY_DATASET=<your-bigquery-dataset>BIGQUERY_TABLE=<your-bigquery-table>DEFAULT_DAILY_BUDGET=5.0LLM_CACHE_SIMILARITY_THRESHOLD=0.85LLM_CACHE_TTL=3600
Copy it to .env.local and fill in real values:
terminal
cp .env.example .env.local# Edit .env.local with your actual credentials
Now create src/config.ts — a Zod-validated config that reads from process.env and transforms raw snake_case keys into a clean camelCase Config object:
Expected output: Every call to loadConfig() either returns a valid Config object or throws a Zod error with a human-readable message listing which variables are missing or invalid.
Step 3: Set up shared types
Create src/types.ts with the data shapes used across the sync and query pipelines:
Expected output:pnpm typecheck continues to exit 0. These types are imported by every service module.
Step 4: Build the BigQuery sync pipeline
The sync pipeline reads rows from a BigQuery table, converts each row to text, generates embeddings with VoyageAI, and upserts them into a pgvector table. It has three components: a BigQuery client, an embedder, and a vector store.
BigQuery client (src/sync/bigquery-client.ts)
This wraps the googleapis library to run SQL queries and stream table data as an async generator. Install it with pnpm add googleapis (already done in Step 1):
Expected output:pnpm typecheck passes. The streamRows method yields arrays of up to 500 rows at a time, letting the sync pipeline process large tables without loading everything into memory.
Embedder (src/sync/embedder.ts)
Wraps the VoyageAI client to generate embeddings. It caches results per text string to avoid redundant API calls during batch processing and retries transient errors with exponential backoff:
Uses pg + pgvector to manage a knowledge_items table with a vector(1024) column. It registers the pgvector types on every new pool connection:
ts
import { Pool, type ClientBase } from "pg";import pgvector from "pgvector/pg";import type { SourceRef } from "../types.js";export class VectorStoreError extends Error { constructor(message: string, cause?: unknown) { super(message); this.name = "VectorStoreError"; if (cause instanceof Error) this.cause = cause; }}export class VectorStore { private pool: Pool;
Expected output: The vector store connects to your PostgreSQL database and creates the knowledge_items table with a vector(1024) column and the pgvector extension.
Step 5: Wire up the sync orchestrator
Create src/sync/index.ts — this ties the three components together into a runSync() function that the /api/bigquery-sync endpoint calls. It streams rows from BigQuery, generates embeddings in batches, and upserts them into pgvector as it goes:
Expected output: After running, SyncResult shows how many rows were processed, how many embeddings were generated, and how many items were upserted into pgvector.
Step 6: Create the Vertex AI LLM service
The LLM service wraps @google-cloud/vertexai to call the Gemini 1.5 Flash model with a system prompt tailored for business-data analysis. It retries transient failures with exponential backoff (up to 8 seconds) and checks for safety-blocked responses:
ts
import { VertexAI } from "@google-cloud/vertexai";const SYSTEM_PROMPT = "You are a business data analyst. Answer questions using the provided context rows from a BigQuery table. Each row contains business metrics. When numbers are present, cite them. When confidence is low, say so. Keep answers concise.";export class ContentBlockedError extends Error { constructor(message: string) { super(message); this.name = "ContentBlockedError"; }}export class LlmService { private model: ReturnType<VertexAI["getGenerativeModel"]>; constructor(project: string, location: string) { const vertexAI = new VertexAI({ project, location }); this.model = vertexAI.getGenerativeModel({ model: "gemini-1.5-flash", systemInstruction: { role: "system" as const, parts: [{ text: SYSTEM_PROMPT }], }, }); } async generateAnswer( query: string, context: string[], conversationHistory?: Array<{ role: string; content: string }>, ): Promise<{ text: string; usage: { inputTokens: number; outputTokens: number } }> { const contextBlock = context.map((c) => `- ${c}`).join("\n"); const userMessage = `Context rows:\n${contextBlock}\n\nQuestion: ${query}`; const contents: Array<{ role: string; parts: Array<{ text: string }> }> = []; if (conversationHistory) { for (const msg of conversationHistory) { contents.push({ role: msg.role, parts: [{ text: msg.content }] }); } } contents.push({ role: "user", parts: [{ text: userMessage }] }); const result = await this.withRetry(async () => { const res = await this.model.generateContent({ contents }); return res.response; }); if (String(result.candidates?.[0]?.finishReason) === "SAFETY") { throw new ContentBlockedError("Response blocked due to safety concerns"); } const text = result.candidates?.[0]?.content?.parts?.[0]?.text ?? ""; const usage = result.usageMetadata ?? { promptTokenCount: 0, candidatesTokenCount: 0 }; return { text, usage: { inputTokens: usage.promptTokenCount ?? 0, outputTokens: usage.candidatesTokenCount ?? 0, }, }; } private async withRetry<T>( fn: () => Promise<T>, maxRetries = 3, ): Promise<T> { for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await fn(); } catch (err) { if (attempt < maxRetries) { const delay = Math.min(1000 * Math.pow(2, attempt), 8000); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } throw err; } } throw new Error("Unreachable"); }}export function createLlmService( project: string, location: string,): LlmService { return new LlmService(project, location);}
Expected output: The LLM service generates answers from Vertex AI using the context rows you retrieved from the vector store. It retries transient failures with exponential backoff and explicitly checks for safety-blocked responses.
Step 7: Wire up the REAA service layer
The query pipeline orchestrates six REAA packages. Create each service under src/api/.
Cache service (src/api/cache-service.ts)
Uses @reaatech/llm-cache with an in-memory adapter and OpenAI embeddings for semantic similarity lookup. When a query is semantically similar to a cached one, the cached response is returned directly, skipping the LLM call entirely:
Uses @reaatech/agent-budget-engine to enforce per-user daily spend caps. It fires threshold-breach and hard-stop events when limits are approached or exceeded:
Uses @reaatech/session-continuity with an in-memory store and Tiktoken tokenizer for multi-turn conversations. It compresses conversation history when the token budget is exceeded:
Expected output: All six services compile cleanly (pnpm typecheck exits 0). Each one wraps a single REAA package behind a clean application interface.
Step 8: Build the query orchestrator
The query orchestrator (src/api/query-service.ts) ties all the services together into a single handleQuery() method. The flow is:
Budget check — if the user has exceeded their daily cap, return { error: "budget_exceeded", code: "BUDGET_EXCEEDED" }
Cache lookup — check semantic cache for similar questions; return immediately on hit
Session load — if a sessionId is provided, load conversation history
Embed + vector search — embed the query and find the 10 most similar rows
LLM generation — call Vertex AI Gemini with context and history
Confidence evaluation — check if the answer needs escalation
Cost recording — record the token usage with cost telemetry and budget
Session persistence — save the question and answer to the conversation
Cache storage — save the response for future similar queries
ts
import type { QueryRequest, QueryResponse } from "../types.js";import type { LlmService } from "./llm-service.js";import type { VectorStore } from "../sync/vector-store.js";import type { CacheService, CacheResult } from "./cache-service.js";import type { BudgetService } from "./budget-service.js";import type { ConfidenceService } from "./confidence-service.js";import type { SessionService } from "./session-service.js";import type { CostTelemetryService } from "./cost-telemetry.js";import type { Embedder } from "../sync/embedder.js";export
Expected output: The query service returns a full QueryResponse with answer text, confidence score, source references, cached flag, cost in USD, and an escalated boolean if the answer needs human review.
Step 9: Expose API routes
Health check (app/api/health/route.ts)
A simple health endpoint that returns the server status and current timestamp:
ts
import { NextResponse } from "next/server";export function GET(): NextResponse { return NextResponse.json({ status: "healthy", timestamp: new Date().toISOString(), });}
Query endpoint (app/api/query/route.ts)
This route instantiates all services at module scope (once at startup, not per-request) and validates incoming POST bodies with Zod. It also wires budget events into Langfuse for observability:
The register() function sets up Langfuse at process startup and hooks into unhandled rejection and exception handlers. Make sure experimental.instrumentationHook: true is set in next.config.ts (done in Step 1):
Expected output: The Next.js dev server starts and exposes three endpoints:
terminal
pnpm dev# GET http://localhost:3000/api/health → {"status":"healthy", ...}# POST http://localhost:3000/api/bigquery-sync → {"status":"ok","result":{...}}# POST http://localhost:3000/api/query → QueryResponse JSON
Step 10: Run the tests
The test suite covers every service and route handler. Each module has happy-path and error-case tests with mocked network dependencies. Run the full suite with coverage:
terminal
pnpm test
Expected output: All 115 tests pass across 34 test suites, and coverage meets the 90% threshold set in vitest.config.ts:
You can also run the type checker and linter individually:
terminal
pnpm typecheckpnpm lint
Next steps
Swap in persistent storage — Replace the in-memory SpendStore and MemoryAdapter with Redis or PostgreSQL-backed adapters so budget state and sessions survive server restarts.
Add a web UI — Build a chat interface in app/page.tsx that calls POST /api/query with user input and streams rendered answers with source citations.
Schedule the sync — Use a cron trigger (e.g., Google Cloud Scheduler hitting POST /api/bigquery-sync daily) to keep the vector knowledge base fresh without manual intervention.
Add per-tenant budgets — Use the BudgetScope.Tenant enum from @reaatech/agent-budget-types to enforce spending limits across entire organizations.
constructor
(databaseUrl
:
string
) {
this.pool = new Pool({
connectionString: databaseUrl,
});
this.pool.on("connect", (client: ClientBase) => {
void pgvector.registerTypes(client);
});
}
async ensureSchema(): Promise<void> {
const client = await this.pool.connect();
try {
await client.query("CREATE EXTENSION IF NOT EXISTS vector");
await client.query(`
CREATE TABLE IF NOT EXISTS knowledge_items (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1024),
source_table TEXT,
source_row_id TEXT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
)
`);
} finally {
client.release();
}
}
async upsert(
id: string,
content: string,
embedding: number[],
metadata: Record<string, unknown>,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query(
`INSERT INTO knowledge_items (id, content, embedding, source_table, source_row_id, metadata)
VALUES ($1, $2, $3::vector, $4, $5, $6::jsonb)
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata`,
[
id,
content,
pgvector.toSql(embedding),
metadata.sourceTable ?? null,
metadata.sourceRowId ?? null,
JSON.stringify(metadata),
],
);
} catch (err) {
throw new VectorStoreError("Failed to upsert", err);
} finally {
client.release();
}
}
async upsertBatch(
items: Array<{
id: string;
content: string;
embedding: number[];
metadata: Record<string, unknown>;
}>,
): Promise<void> {
if (items.length === 0) return;
const client = await this.pool.connect();
try {
for (const item of items) {
await client.query(
`INSERT INTO knowledge_items (id, content, embedding, source_table, source_row_id, metadata)
VALUES ($1, $2, $3::vector, $4, $5, $6::jsonb)
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata`,
[
item.id,
item.content,
pgvector.toSql(item.embedding),
item.metadata.sourceTable ?? null,
item.metadata.sourceRowId ?? null,
JSON.stringify(item.metadata),
],
);
}
} catch (err) {
throw new VectorStoreError("Failed to upsert batch", err);