Skip to content
reaatechREAATECH

@reaatech/media-pipeline-mcp-core

npm v0.3.0

Core framework for media pipeline orchestration, providing a Zod-validated type system, pipeline execution engine with variable interpolation, quality gate evaluation, artifact registry, budget enforcement, persistence-based resume, cost tracking, event bus, and a configurable mock provider for testing.

@reaatech/media-pipeline-mcp-core

npm version License: MIT CI

Status: Pre-1.0 — APIs may change in minor versions. Pin to a specific version in production.

Core framework for media pipeline orchestration. Provides the complete type system (Zod-validated), pipeline execution engine with variable interpolation, validation with provider availability checks, quality gate evaluation (threshold, dimension-check, LLM-judge, custom), artifact registry, budget enforcement, persistence-based resume, cost tracking, event bus, and a configurable mock provider for testing.

Installation

terminal
npm install @reaatech/media-pipeline-mcp-core
# or
pnpm add @reaatech/media-pipeline-mcp-core

Feature Overview

  • Pipeline execution engine — sequential step processing with {{step_id.output}} variable interpolation, timeout handling, budget preflight, persistence-based resume, and run lifecycle management
  • Zod-validated type system — 20+ schemas for Pipeline, Step, Artifact, QualityGate, CostRecord, PipelineEvent, VariantsConfig, RunContext, and more
  • Quality gate evaluationthreshold (numeric checks on metadata fields), dimension-check (output dimensions with tolerance), llm-judge (LLM evaluates quality), and custom (user-provided function) with retry/gating/fail/warn actions
  • Artifact registry — in-memory artifact tracking with CRUD operations, source-step lookup, batch deletion by source step, and latest-artifact retrieval
  • Pipeline validation — Zod schema validation, duplicate step detection, path-traversal checks, circular/forward reference detection, provider availability verification, quality gate config validation, budget and variants config validation
  • Pipeline estimation — dry-run cost estimation with per-step usdLow/usdHigh bands, provider-specific pricing, router-spread warnings, and prior-step dependency detection
  • Mock provider — configurable simulated provider with delay, failure rate, base cost, and operation-specific artifact type/mime generation for development and testing
  • Event bus — typed event emitter with kind-based subscription, promise-based await, and optional timeout
  • Budget enforcementabort/suspend on budget exceed with configurable warning thresholds
  • Persistence integrationPipelineStateStore and CostLedger injection for run durability and cost recording
  • Provenance signing — C2PA manifest callback with pipeline definition hash and ingredient tracking
  • Extensible provider interfaceProvider contract with execute, healthCheck, and optional estimateCost
  • Stateful resume (F3) — persistence-backed resume from any step with lock acquisition and step state tracking
  • Adapter injection callbacks — route-based provider selection, variants execution, ratio fan-out, context resolution, gate evaluation, and tenant policy enforcement via injected callbacks
  • 30+ typed error classes — categorized errors with code, retryable flags, and structured payloads for BudgetExceededError, ArtifactNotFoundError, VariantsAllRejectedError, SafetyGateRejectedError, and more

Quick Start

typescript
import {
  PipelineExecutor,
  PipelineValidator,
  ArtifactRegistry,
  MockProvider,
} from "@reaatech/media-pipeline-mcp-core";
 
const executor = new PipelineExecutor({
  providers: [new MockProvider()],
  defaultStepTimeoutMs: 60000,
});
 
const result = await executor.execute({
  id: "product-photo",
  steps: [
    {
      id: "generate",
      operation: "mock.generate",
      inputs: { prompt: "A sunset over mountains" },
      config: { dimensions: "1024x1024" },
      qualityGate: {
        type: "threshold",
        config: { checks: [{ field: "metadata.width", operator: ">=", value: 1024 }] },
        action: "retry",
        maxRetries: 2,
      },
    },
    {
      id: "upscale",
      operation: "mock.transform",
      inputs: { artifact_id: "{{generate.output}}" },
      config: { scale: "4x" },
    },
  ],
  budget: {
    maxUsd: 1.0,
    onExceed: "abort",
    warnAtPct: 0.8,
  },
});
 
console.log(result.status); // "completed"
console.log(result.artifacts.size); // 2

API Reference

PipelineExecutor

The core execution engine for running pipelines with step-by-step processing, quality gate evaluation, budget enforcement, and variable interpolation.

typescript
class PipelineExecutor {
  constructor(options: PipelineExecutorOptions);
  execute(definition: PipelineDefinition, options?: { runId?: string }): Promise<Pipeline>;
  resume(runId: string, fromStepId?: string): Promise<Pipeline>;
  resume(pipeline: Pipeline, action: 'retry' | 'skip' | 'abort'): Promise<Pipeline>;
  estimate(definition: PipelineDefinition): Promise<PipelineEstimate>;
  getRegistry(): ArtifactRegistry;
}

PipelineExecutorOptions

PropertyTypeDefaultDescription
providersProvider[]requiredProvider instances registered by operation
defaultPipelineTimeoutMsnumber300000Max pipeline wall-clock time
defaultStepTimeoutMsnumberMax per-step execution time
llmJudgeFn(prompt, artifact) => Promise<{pass,reasoning,score?}>LLM-based quality evaluation
customCheckFn(artifact, config) => boolean | Promise<boolean>Custom quality gate check
prepareInputs(op, inputs) => Promise<Record>Pre-execution input transformation
persistArtifact({artifactId,data,...}) => Promise<{uri?}>Storage persistence callback
onEvent(event: PipelineEvent) => voidLifecycle event listener
onCost(record: CostRecord) => voidPer-operation cost callback
persistencePipelineStateStoreRun state persistence (enables resume)
ledgerCostLedgerCost accounting ledger
routeStepFnRouteStepFnRoute-based provider selection
variantsStepFnVariantsStepFnVariants execution callback
ratiosStepFnRatiosStepFnAspect-ratio fan-out callback
contextRunContextRun context (voices, styles, brand kit)
contextResolveFnContextResolveFnContext reference resolution
gateEvalFnGateEvalFnCustom gate evaluation (loudness, safety)
tenantPolicyEnforceFn(provider?, model?) => voidPer-tenant allow-list enforcement
signProvenance({artifactId, runId, ...}) => Promise<{signedArtifactId,manifestUri}>C2PA provenance signing

Provider Interface

typescript
interface Provider {
  readonly name: string;
  readonly supportedOperations: string[];
  execute(operation: string, inputs: Record<string, unknown>, config: Record<string, unknown>): Promise<{
    data?: Buffer | NodeJS.ReadableStream;
    artifact: Omit<Artifact, 'id' | 'createdAt'>;
    cost_usd?: number;
    duration_ms?: number;
  }>;
  healthCheck(): Promise<boolean>;
  estimateCost?(input: { operation: string; params: Record<string, unknown>; config: Record<string, unknown> }): Promise<{ costUsd: number; estimatedDurationMs?: number }>;
}

PipelineValidator

Validates pipeline definitions with schema checks, reference integrity, provider availability, and config warnings.

typescript
class PipelineValidator {
  constructor(providerAvailability: ProviderAvailability);
  validate(definition: PipelineDefinition): ValidationResult;
}

Validation checks performed:

  1. Zod schema validation against PipelineDefinitionSchema
  2. Duplicate step ID and path-traversal character detection
  3. Circular/forward reference detection in {{step_id.output}} patterns
  4. Provider availability for each operation
  5. Quality gate configuration completeness (retry without maxRetries, llm-judge without prompt, etc.)
  6. Budget config validity (maxUsd > 0, onExceed must be 'abort' or 'suspend')
  7. Variants config validity (n between 2-16, valid seedStrategy, judge configured)
  8. Run context validity (valid voice providers, clean style names)

PipelineEstimator

Dry-run cost estimation with per-step bands.

typescript
class PipelineEstimator {
  constructor(options?: PipelineEstimatorOptions);
  estimate(pipeline: PipelineDefinition): Promise<PipelineEstimate>;
}
 
interface PipelineEstimate {
  totalUsdLow: number;
  totalUsdHigh: number;
  perStep: StepEstimate[];
  warnings: EstimateWarning[];
}

ArtifactRegistry

In-memory artifact tracking for pipeline execution.

typescript
class ArtifactRegistry {
  register(artifact: Omit<Artifact, 'id'>): Artifact;
  registerWithId(id: string, artifact: Omit<Artifact, 'id'>): Artifact;
  get(id: string): Artifact | undefined;
  delete(id: string): boolean;
  list(): Artifact[];
  findBySourceStep(stepId: string): Artifact | undefined;
  deleteBySourceStep(stepId: string): number;
  clear(): void;
  size(): number;
}

Quality Gates

typescript
import {
  createQualityGateEvaluator,
  ThresholdEvaluator,
  DimensionCheckEvaluator,
  LLMJudgeEvaluator,
  CustomEvaluator,
} from "@reaatech/media-pipeline-mcp-core";
 
const evaluator = createQualityGateEvaluator(
  { type: "threshold", config: { checks: [{ field: "metadata.width", operator: ">=", value: 1024 }] }, action: "fail" },
  llmJudgeFn,   // required for llm-judge type
  customCheckFn // required for custom type
);
 
const result = await evaluator.evaluate(gate, artifact);
// { passed: boolean, reasoning: string, score?: number, action: 'fail' | 'retry' | 'warn' }

Gate Types

TypeConfigDescription
threshold{ checks: [{ field, operator, value }] }Numeric checks on artifact metadata fields. Operators: >=, <=, >, <, ==, !=
dimension-check{ expectedWidth, expectedHeight, tolerance? }Verify output dimensions within tolerance (0-1)
llm-judge{ prompt, model?, timeout? }LLM evaluates output quality against a prompt. Requires llmJudgeFn injection
custom{ [key: string]: unknown }Arbitrary config passed to customCheckFn

Gate Actions

ActionBehavior
failHalt pipeline execution immediately
retryRe-execute the step up to maxRetries times
warnLog a warning and continue execution

MockProvider

Configurable simulated provider for development and testing.

typescript
class MockProvider implements Provider {
  readonly name: string;
  readonly supportedOperations: string[];
 
  constructor(config?: {
    name?: string;                         // default: "mock"
    operations?: string[];                 // default: ["mock.generate", "mock.transform", "mock.extract"]
    delay?: number;                        // Simulated latency in ms (default: 100)
    failureRate?: number;                  // 0-1 probability of failure (default: 0)
    baseCost?: number;                     // Cost per operation (default: 0.001)
    alwaysPass?: boolean;                  // Generate high-quality (0.99) metadata (default: false)
  });
}

Event Bus

Typed event bus for pipeline lifecycle events.

typescript
import { createEventBus } from "@reaatech/media-pipeline-mcp-core";
 
const bus = createEventBus<{ kind: "step:complete"; stepId: string; artifactId: string }>();
 
const dispose = bus.on("step:complete", (event) => {
  console.log(`Step ${event.stepId} completed: ${event.artifactId}`);
});
 
bus.emit({ kind: "step:complete", stepId: "generate", artifactId: "artifact-123" });
 
// Promise-based await with optional predicate and timeout
const event = await bus.await("step:complete", (e) => e.stepId === "generate", 5000);
 
dispose(); // Unsubscribe

Core Types

ExportDescription
PipelineSchema / PipelineFull pipeline with steps, status, artifacts, and timing
PipelineDefinitionSchema / PipelineDefinitionUser-supplied pipeline definition with optional budget/context
PipelineStepSchema / PipelineStepStep with id, operation, inputs, config, quality gate, variants, cache, route
ArtifactSchema / ArtifactPipeline output with id, type, uri, mimeType, metadata, sourceStep
QualityGateSchema / QualityGateGate with type, config, action, maxRetries
QualityGateResultSchema / QualityGateResultEvaluation result with passed, reasoning, score, action
CostRecordSchema / CostRecordPer-operation cost entry with operation, provider, model, cost_usd
CostSummarySchema / CostSummaryAggregated costs by operation, provider, pipeline
PipelineEventSchema / PipelineEventLifecycle event with type, pipelineId, stepId, timestamp, data
PipelineStatuspending" | "running" | "completed" | "failed" | "gated" | "cancelled
PipelineRunRecordPersistence-layer run record with runId, definition, stepStates, cost
StepStateRecordPer-step state in persistence: status, artifactId, attempts, timing
PipelineStateStorePersistence interface: createRun, getRun, updateRun, acquireLock, releaseLock, listRuns
CostLedgerCost interface: charge, getRunCost, getTotalCost
BudgetConfigBudget with maxUsd, onExceed, warnAtPct
PipelineEstimate / StepEstimate / EstimateWarningDry-run estimation results
RunContext / VoiceRef / StyleRef / BrandKitRun context for voice/style/brand resolution
VariantsConfig / VariantResult / VariantsStepOutputVariants configuration and results
JudgeConfig / JudgeRubricVariants judge types (llm-judge, image-judge, rule, custom)
ArtifactMetaSchema / ArtifactMetaStorage-level artifact metadata
ValidationResultSchema / ValidationResultValidator output with valid, errors, warnings, estimates

Error Classes

All errors extend A2AError and carry code and retryable properties.

Error ClassCodeRetryableDescription
IdempotencyConflictErrorIDEMPOTENCY_CONFLICTNoDuplicate idempotency key with in-flight or body-mismatch
BudgetExceededErrorBUDGET_EXCEEDEDNoBudget cap reached (run, tenant-daily, or tenant-monthly)
RunNotFoundErrorRUN_NOT_FOUNDNoPersisted run not found for resume
RunInProgressErrorRUN_IN_PROGRESSYesRun lock held by another process
RunNotResumableErrorRUN_NOT_RESUMABLENoRun in terminal state or resumable: false
WebhookSignatureInvalidErrorWEBHOOK_SIGNATURE_INVALIDNoWebhook HMAC verification failed
WebhookProviderUnknownErrorWEBHOOK_PROVIDER_UNKNOWNNoWebhook provider not recognized
StateStoreUnavailableErrorSTATE_STORE_UNAVAILABLEYesPersistence backend unreachable
EstimateUnsupportedErrorESTIMATE_UNSUPPORTEDNoCost estimation not available for operation
ArtifactNotFoundErrorARTIFACT_NOT_FOUNDNoReferenced artifact missing from registry
RouterAllCandidatesFailedErrorROUTER_ALL_CANDIDATES_FAILEDNoAll routing candidates exhausted
RouterNoCandidatesErrorROUTER_NO_CANDIDATESNoEmpty candidate list for routing
RouterFastestIneligibleErrorROUTER_FASTEST_INELIGIBLENoCandidate exceeds fastest strategy duration cap
SafetyGateRejectedErrorSAFETY_GATE_REJECTEDNoContent safety check blocked artifact
TenantNotFoundErrorTENANT_NOT_FOUNDNoTenant not found in multi-tenant deployment
KeyVaultUnavailableErrorKEY_VAULT_UNAVAILABLEYesKey vault service unreachable
FfmpegUnavailableErrorFFMPEG_UNAVAILABLENoffmpeg not found for audio/video processing
VariantsAllRejectedErrorVARIANTS_ALL_REJECTEDNoAll variants failed (safety, judge-low, generation-error)
JudgeUnavailableErrorJUDGE_UNAVAILABLEYesLLM judge service unreachable
WorkflowNotFoundErrorWORKFLOW_NOT_FOUNDNoComfyUI workflow not found
WorkflowExpiredErrorWORKFLOW_EXPIREDNoComfyUI workflow exceeded retention period
ContextRefUnknownErrorCONTEXT_REF_UNKNOWNNoContext reference (voice/style/brand) not found
ContextRefTypeErrorCONTEXT_REF_TYPE_MISMATCHNoContext reference type mismatch for operation
LoudnessGateFailedErrorLOUDNESS_GATE_FAILEDNoAudio loudness gate out of tolerance
TenantPolicyViolationErrorTENANT_POLICY_VIOLATIONNoProvider/model blocked by tenant allow-list
ProvenanceSigningFailedErrorPROVENANCE_SIGNING_FAILEDNoC2PA manifest signing failed
SafetyProviderUnavailableErrorSAFETY_PROVIDER_UNAVAILABLEYesSafety classifier unreachable
RatioUnsupportedErrorRATIO_UNSUPPORTEDNoAspect ratio not natively supported by provider
InvalidInputErrorINVALID_INPUTNoInvalid step input
FormatUnsupportedErrorFORMAT_UNSUPPORTEDNoOutput format not supported for operation
ArtifactAccessDeniedErrorARTIFACT_ACCESS_DENIEDNoTenant-scoped access denied
InvalidResourceUriErrorINVALID_RESOURCE_URINoInvalid resource URI format

Usage Patterns

Pipeline with Persistence and Resume

typescript
const executor = new PipelineExecutor({
  providers: [new MockProvider()],
  persistence: stateStore,     // PipelineStateStore for durability
  ledger: costLedger,          // CostLedger for cost tracking
  onEvent: (event) => console.log(event.type, event.pipelineId),
  onCost: (record) => console.log(`Cost: $${record.cost_usd}`),
});
 
// Execute with a specific runId for idempotency
const result = await executor.execute(definition, { runId: "run-001" });
 
// Resume from persistence
const resumed = await executor.resume("run-001");
// Or resume from a specific step
const resumedFrom = await executor.resume("run-001", "upscale");

Dry-Run Estimation

typescript
const estimator = new PipelineEstimator({
  estimateOperation: async (operation, config) => {
    return { usdLow: 0.001, usdHigh: 0.01 };
  },
  ledger: costLedger,
});
 
const estimate = await estimator.estimate(definition);
console.log(`Cost range: $${estimate.totalUsdLow} - $${estimate.totalUsdHigh}`);

Validation with Provider Checks

typescript
const validator = new PipelineValidator({
  isAvailable: (op) => providers.has(op),
  getEstimatedCost: (op) => 0.01,
  getEstimatedDuration: (op) => 5000,
});
 
const result = validator.validate(definition);
if (!result.valid) {
  result.errors.forEach(e => console.error(e));
}
result.warnings.forEach(w => console.warn(w));

LLM-Judge Quality Gate

typescript
const executor = new PipelineExecutor({
  providers: [new MockProvider()],
  llmJudgeFn: async (prompt, artifact) => {
    const response = await callLLM(prompt, artifact.uri);
    return { pass: response.score >= 7, reasoning: response.explanation, score: response.score };
  },
});
 
// Step with llm-judge:
{
  id: "generate",
  operation: "image.generate",
  inputs: { prompt: "..." },
  config: {},
  qualityGate: {
    type: "llm-judge",
    config: { prompt: "Rate image quality from 1-10", timeout: 30000 },
    action: "retry",
    maxRetries: 2,
  },
}

Event Bus Usage

typescript
const bus = createEventBus<{ kind: "pipeline:complete"; pipelineId: string; artifacts: string[] }>();
 
bus.on("pipeline:complete", (event) => {
  console.log(`Pipeline ${event.pipelineId} done with ${event.artifacts.length} artifacts`);
});
 
// Await an event
const completed = await bus.await("pipeline:complete", (e) => e.pipelineId === "my-pipeline", 60000);

License

MIT