@reaatech/otel-genai-semconv-instrumentation
Status: Pre-1.0 — APIs may change in minor versions. Pin to a specific version in production.
Core instrumentation framework for LLM observability. Provides tracer management, lifecycle hooks, streaming response handling, error classification, exponential-backoff retry, per-provider circuit breaking, chunk aggregation, and span-level PII redaction. All provider-specific instrumentation packages (otel-genai-semconv-openai, etc.) are built on top of this framework.
Installation
npm install @reaatech/otel-genai-semconv-instrumentation
# or
pnpm add @reaatech/otel-genai-semconv-instrumentation
Feature Overview
Tracer management — unified OTel tracer with context propagation, child spans, and auto-end helpers
Lifecycle hooks — onStart, onEnd, onError hooks with priority ordering and ID-based unregistration
Streaming instrumentation — automatic TTFT tracking, chunk counting, token accumulation, and stream wrapping
Chunk aggregation — reconstruct complete responses from streaming chunks, including tool call merging
Streaming events — configurable per-chunk or per-token event emission on spans
Error classification — 12 error types (rate limit, auth, timeout, content filter, etc.) with retryability flags
Retry handler — exponential backoff with jitter, configurable retryable error types, attempt metadata on spans
Circuit breaker — per-provider circuit breaking with half-open recovery, configurable thresholds, span metadata
Span processor — PII redaction and attribute filtering at span export time
Dual ESM/CJS output — works with import and require
Quick Start
import {
TracerManager,
HookManager,
ErrorHandler,
RetryHandler,
CircuitBreaker,
} from "@reaatech/otel-genai-semconv-instrumentation" ;
// Create a tracer
const tracer = new TracerManager ();
const span = tracer. startSpan ( "gen_ai.chat.completion" );
// Register lifecycle hooks
const hooks = new HookManager ();
hooks. onStart ((ctx) => {
ctx.span. setAttribute ( "custom.user_id" , "abc123" );
});
hooks. onEnd ((ctx) => {
ctx.span. setAttribute ( "custom.quality_score" , 0.95 );
});
// Classify errors
const errorHandler = new ErrorHandler ();
const errorType = errorHandler. classifyError (rateLimitError);
// → LLMErrorType.RATE_LIMIT
// Retry with backoff
const retry = new RetryHandler ({ maxRetries: 3 });
await retry. executeWithRetry ( async (attempt) => {
return await callLLM ();
});
// Circuit break per provider
const breaker = new CircuitBreaker ({ failureThreshold: 5 });
if (breaker. canExecute ()) {
breaker. recordSuccess ();
}
API Reference
TracerManager (class)
Constructor
new TracerManager (options ?: { tracerName?: string; tracerVersion ?: string })
Methods
Method Description getTracer()Get or create the OTel tracer startSpan(name, options?)Start a new span startChildSpan(parentSpan, name, options?)Start a span as a child of another span withSpan(span, fn)Execute a sync function within a span context withSpanAsync(span, fn)Execute an async function within a span context withAutoEndSpan(name, fn, options?)Create a span that auto-ends when the async function resolves withAutoEndSpanSync(name, fn, options?)Create a span that auto-ends when the sync function returns getCurrentSpan()Get the current span from active context getCurrentContext()Get the current OTel context setActiveSpan(span)Set a span as active in context inject(headers)Inject trace context into headers extract(headers)Extract trace context from headers reset()Reset the tracer (useful for testing)
HookManager (class)
Methods
Method Description onStart(fn, options?)Register an onStart hook, returns hook ID onEnd(fn, options?)Register an onEnd hook, returns hook ID onError(fn, options?)Register an onError hook, returns hook ID unregister(id)Remove a hook by ID executeOnStart(context)Execute all onStart hooks executeOnEnd(context)Execute all onEnd hooks executeOnError(context)Execute all onError hooks clear()Remove all hooks getHookCount(type?)Count registered hooks listHooks(type?)List hook IDs
Hook Contexts
Context Properties RequestHookContextspan, provider, model, requestId?, traceId?, spanId?, requestResponseHookContextspan, provider, model, requestId?, traceId?, spanId?, responseErrorHookContextspan, provider, model, requestId?, error, errorType?
StreamingHandler (class)
const handler = new StreamingHandler (span, { onChunk : (chunk) => console. log (chunk) });
handler. processChunk (chunk, tokenDelta);
handler. complete ({ outputTokens: 500 , finishReason: "stop" });
handler. error ( new Error ( "stream interrupted" ));
handler. getMetrics (); // { timeToFirstTokenMs, totalDurationMs, chunkCount, outputTokens, startTime }
instrumentStream(stream, span, options?)
Wraps an AsyncIterable to automatically instrument streaming:
import { instrumentStream } from "@reaatech/otel-genai-semconv-instrumentation" ;
const instrumented = instrumentStream (originalStream, span, {
getTokenCount : (chunk) => chunk.usage?.completion_tokens,
});
for await ( const chunk of instrumented) {
// Each chunk is measured; TTFT, duration, and chunk count are captured automatically
}
ErrorHandler (class)
Methods
Method Description classifyError(error)Classify an error to LLMErrorType isRetryable(errorType)Check if an error type should be retried captureError(span, error, context?)Capture error on a span (status, exception, attributes) getUserMessage(errorType)Get a user-friendly error message
LLMErrorType (enum)
Value Retryable Description RATE_LIMITYes 429 / rate limit exceeded AUTHENTICATIONNo 401 / invalid API key AUTHORIZATIONNo 403 / permission denied INVALID_REQUESTNo 400 / bad request NOT_FOUNDNo 404 / model not found VALIDATIONNo Validation error SERVERYes 5xx / server error TIMEOUTYes Request timed out NETWORKYes Connection / fetch failure CONTENT_FILTERNo Content filtered / safety QUOTA_EXCEEDEDNo Quota exhausted UNKNOWNNo Unclassified error
RetryHandler (class)
Constructor
new RetryHandler ({
maxRetries: 3 ,
initialDelayMs: 1000 ,
maxDelayMs: 30000 ,
backoffMultiplier: 2 ,
jitter: true ,
retryableErrorTypes: [LLMErrorType.RATE_LIMIT, LLMErrorType.TIMEOUT],
})
Methods
Method Description canRetry()Check if more retries are available shouldRetry(errorType)Check if this error type should be retried calculateDelay()Calculate the delay for the next attempt recordAttempt(delayMs, error?, errorType?)Record a retry attempt recordOnSpan(span)Add retry metadata to a span (attempt count, events) executeWithRetry(fn, onError?)Execute a function with full retry logic getTotalDelay()Total delay across all retries getAttempts()All recorded retry attempts reset()Reset retry state
CircuitBreaker (class)
Constructor
new CircuitBreaker ({
failureThreshold: 5 ,
successThreshold: 3 ,
recoveryTimeoutMs: 60000 ,
failureErrorTypes: [LLMErrorType.RATE_LIMIT, LLMErrorType.TIMEOUT],
})
Circuit States
State Description CLOSEDNormal operation — requests flow through OPENFailures exceeded threshold — requests are blocked HALF_OPENRecovery timeout elapsed — testing if service recovered
Methods
Method Description getState()Current state (auto-transitions OPEN → HALF_OPEN) canExecute()Whether a request should be allowed recordSuccess(span?)Record a successful call recordFailure(errorType, span?)Record a failed call recordOnSpan(span)Add circuit breaker state to span attributes getStats()Current statistics (requests, failures, state) reset()Reset to CLOSED state
CircuitBreakerRegistry (class)
Manages per-provider circuit breakers:
const registry = new CircuitBreakerRegistry ();
const openaiBreaker = registry. get ( "openai" );
const anthropicBreaker = registry. get ( "anthropic" );
registry. getAllStats (); // Map<string, CircuitStats>
ChunkAggregator (class)
Accumulates streaming chunks into a complete response:
const aggregator = new ChunkAggregator ();
aggregator. addText ( "Hello" );
aggregator. addText ( ", world!" );
aggregator. setFinishReason ( "stop" );
aggregator. setModel ( "gpt-4" );
aggregator. setTokenUsage ( 50 , 10 );
const response = aggregator. build ();
// { content: "Hello, world!", finishReason: "stop", model: "gpt-4", ... }
SpanProcessor (class)
Implements the OTel SpanProcessor interface with PII redaction:
new SpanProcessor ({
piiRedactionEnabled: true ,
redactMessageContent: false ,
customAttributes: { "environment" : "production" },
})
Usage Patterns
Retry with Custom Error Handler
const retry = new RetryHandler ({ maxRetries: 3 });
const errorHandler = new ErrorHandler ();
try {
const result = await retry. executeWithRetry (
async (attempt) => await openai.chat.completions. create (params),
async (error, attempt) => {
const type = errorHandler. classifyError (error);
return errorHandler. isRetryable (type) && attempt < 3 ;
}
);
} catch (err) {
errorHandler. captureError (span, err as Error );
}
Circuit Breaking with Span Metadata
const breaker = createCircuitBreaker ({ failureThreshold: 3 , recoveryTimeoutMs: 30000 });
if ( ! breaker. canExecute ()) {
throw new Error ( "Circuit is open — try again later" );
}
try {
const result = await makeLLMCall ();
breaker. recordSuccess (span);
} catch (error) {
const type = errorHandler. classifyError (error);
breaker. recordFailure (type, span);
throw error;
}
Streaming with Automatic TTFT
const span = tracer. startSpan ( "gen_ai.chat.completion" );
const instrumented = instrumentStream (stream, span, {
getTokenCount : (chunk) => {
// extract token count from provider-specific chunk format
return chunk.usage?.completion_tokens;
},
});
for await ( const chunk of instrumented) {
// TTFT, chunk count, and output tokens are captured automatically
}
// On stream end, the span is auto-finalized with gen_ai.streaming.* attributes
Related Packages
License
MIT