Skip to main content

Embedding Guide

This guide walks you through embedding Constellation Engine in your own JVM application. By the end you will have a standalone program that compiles a constellation-lang script, executes it, and prints the result — no HTTP server required.

Overview

Constellation Engine is an embeddable library. The HTTP server and VSCode extension are optional layers built on top of the same API you use directly:

Your Application
└─ Constellation API (compile, execute, manage modules)
├─ Runtime (DAG execution, scheduling, lifecycle)
├─ Compiler (parse, type-check, compile constellation-lang)
└─ StdLib (built-in modules: math, string, list, etc.)

All interactions go through two main entry points:

Entry PointPurpose
Constellation traitRegister modules, store pipelines, execute pipelines
LangCompilerCompile constellation-lang source into LoadedPipeline

Add Dependencies

sbt

val constellationVersion = "0.7.0"

libraryDependencies ++= Seq(
"io.github.vledicfranco" %% "constellation-core" % constellationVersion,
"io.github.vledicfranco" %% "constellation-runtime" % constellationVersion,
"io.github.vledicfranco" %% "constellation-lang-compiler" % constellationVersion,
"io.github.vledicfranco" %% "constellation-lang-stdlib" % constellationVersion
)

Add the HTTP module only if you need the server:

libraryDependencies += "io.github.vledicfranco" %% "constellation-http-api" % constellationVersion

Required Transitive Dependencies

The library pulls in:

  • Cats Effect 3IO monad for all effectful operations
  • Circe — JSON encoding/decoding
  • cats-parse — parser combinators (compiler module)
  • http4s — only if you include http-api

Minimal Setup

import cats.effect._
import cats.implicits._
import io.constellation.impl.ConstellationImpl
import io.constellation.stdlib.StdLib
import io.constellation.lang.LangCompiler

object MinimalExample extends IOApp.Simple {

val source = """
in text: String
result = Uppercase(text)
out result
"""

def run: IO[Unit] =
for {
// 1. Create a Constellation instance
constellation <- ConstellationImpl.init

// 2. Register standard library modules
_ <- StdLib.allModules.values.toList.traverse(constellation.setModule)

// 3. Create a compiler with StdLib function signatures
compiler = StdLib.compiler

// 4. Compile the source
compiled <- IO.fromEither(
compiler.compile(source, "my-pipeline").leftMap(errs =>
new RuntimeException(errs.map(_.message).mkString("\n"))
)
)

// 5. Execute the pipeline
sig <- constellation.run(
compiled.pipeline,
inputs = Map("text" -> io.constellation.TypeSystem.CValue.VString("hello world"))
)

// 6. Read outputs
_ <- IO.println(s"Outputs: ${sig.outputs}")
} yield ()
}

Compile and Execute

Compilation

LangCompiler.compile parses, type-checks, and compiles constellation-lang source into a CompilationOutput:

val result: Either[List[CompileError], CompilationOutput] =
compiler.compile(source, dagName)

CompilationOutput contains:

FieldTypeDescription
pipelineLoadedPipelineThe compiled pipeline (includes image.dagSpec and syntheticModules)
warningsList[CompileWarning]Non-fatal compilation warnings

Execution

Pass the compiled pipeline and inputs to the runtime:

val sig: IO[DataSignature] = constellation.run(
compiled.pipeline,
inputs // Map[String, CValue]
)

DataSignature provides:

FieldDescription
outputsMap[String, CValue] — declared pipeline output values
computedNodesMap[String, CValue] — all intermediate computed values
statusPipelineStatus — Completed, Suspended, or Failed
missingInputsList[String] — inputs that were not provided

Reusing Compiled Pipelines

Compile once, execute many times:

// Store the pipeline image for later execution
val hash = constellation.pipelineStore.store(compiled.pipeline.image)

// Optionally give it a human-readable alias
constellation.pipelineStore.alias("my-pipeline", hash)

// Execute by name or hash
val sig = constellation.run("my-pipeline", inputs, ExecutionOptions())

Complete Runnable Example

This self-contained example compiles and runs a two-step text pipeline:

import cats.effect._
import cats.implicits._
import io.constellation._
import io.constellation.TypeSystem._
import io.constellation.TypeSystem.CValue._
import io.constellation.impl.ConstellationImpl
import io.constellation.stdlib.StdLib
import io.constellation.lang.LangCompiler

object TextPipeline extends IOApp.Simple {

val source = """
in text: String

trimmed = Trim(text)
upper = Uppercase(trimmed)
words = WordCount(upper)

out upper
out words
"""

def run: IO[Unit] =
for {
constellation <- ConstellationImpl.init
_ <- StdLib.allModules.values.toList.traverse(constellation.setModule)

compiler = StdLib.compiler
compiled <- IO.fromEither(
compiler.compile(source, "text-pipeline").leftMap(errs =>
new RuntimeException(errs.map(_.message).mkString("\n"))
)
)

sig <- constellation.run(
compiled.pipeline,
Map("text" -> VString(" hello world "))
)

_ <- IO.println(s"upper = ${sig.outputs.get("upper")}")
_ <- IO.println(s"words = ${sig.outputs.get("words")}")
} yield ()
}

Expected output:

upper = Some(VString(HELLO WORLD))
words = Some(VLong(2))

Adding Custom Modules

Define a Module

Use ModuleBuilder with case class inputs and outputs:

import io.constellation.ModuleBuilder
import io.constellation.lang.semantic.{FunctionSignature, SemanticType}

// Input/output case classes — field names become parameter names in constellation-lang
case class SentimentInput(text: String)
case class SentimentOutput(score: Double, label: String)

val sentimentModule = ModuleBuilder
.metadata("Sentiment", "Analyzes text sentiment", 1, 0)
.tags("ml", "nlp")
.implementationPure[SentimentInput, SentimentOutput] { input =>
// Replace with your actual ML model call
val score = if (input.text.contains("good")) 0.9 else 0.1
val label = if (score > 0.5) "positive" else "negative"
SentimentOutput(score, label)
}
.build

For modules that perform IO (HTTP calls, database queries, file reads):

.implementation[SentimentInput, SentimentOutput] { input =>
IO {
// Side-effectful operations here
callExternalApi(input.text)
}
}
Pure vs IO Implementations

Use implementationPure for CPU-bound, side-effect-free operations. Use implementation (returns IO) for anything involving external resources, network calls, or blocking operations.

Register the Module

// Register the runtime module
constellation.setModule(sentimentModule)

// Register the function signature so the compiler recognizes it
val sentimentSig = FunctionSignature(
name = "Sentiment",
params = List("text" -> SemanticType.SString),
returns = SemanticType.SRecord(Map(
"score" -> SemanticType.SFloat,
"label" -> SemanticType.SString
)),
moduleName = "Sentiment"
)

val compiler = LangCompiler.builder
.withFunctions(StdLib.allSignatures :+ sentimentSig)
.build

Now you can use Sentiment in constellation-lang scripts:

in review: String
analysis = Sentiment(review)
out analysis

Production Configuration

Builder API

ConstellationImpl.builder() provides full control over runtime behavior:

import io.constellation.impl.ConstellationImpl
import io.constellation.spi.ConstellationBackends
import io.constellation.execution._

val constellation = ConstellationImpl.builder()
.withScheduler(scheduler)
.withBackends(backends)
.withDefaultTimeout(30.seconds)
.withLifecycle(lifecycle)
.build()

Bounded Scheduler

Production Recommendation

The default scheduler is unbounded — every task runs immediately. For production, use a bounded scheduler with priority ordering to prevent resource exhaustion under load.

The default scheduler is unbounded — every task runs immediately. For production, use a bounded scheduler with priority ordering:

import io.constellation.execution.GlobalScheduler

// Resource-based (recommended — cleans up on shutdown)
GlobalScheduler.bounded(
maxConcurrency = 16,
maxQueueSize = 1000,
starvationTimeout = 30.seconds
).use { scheduler =>
// Use scheduler here
}

Priority levels:

PriorityValueUse Case
Critical100Health checks, control plane
High80User-facing requests
Normal50Default
Low20Background jobs
Background0Housekeeping

SPI Backends

ConstellationBackends bundles pluggable integrations. All default to no-op with zero overhead:

import io.constellation.spi._

val backends = ConstellationBackends(
metrics = myPrometheusMetrics, // MetricsProvider
tracer = myOtelTracer, // TracerProvider
listener = myKafkaListener, // ExecutionListener
cache = Some(myRedisCache) // CacheBackend
)

See the SPI Integration Guides for implementation examples with popular libraries.

Lifecycle Management

ConstellationLifecycle enables graceful shutdown with in-flight execution draining:

import io.constellation.execution.ConstellationLifecycle

for {
lifecycle <- ConstellationLifecycle.create
constellation <- ConstellationImpl.builder()
.withLifecycle(lifecycle)
.build()

// ... run your application ...

// Graceful shutdown: wait up to 30s for in-flight executions
_ <- lifecycle.shutdown(drainTimeout = 30.seconds)
} yield ()

Lifecycle states: RunningDrainingStopped.

Resource Cleanup

Always use ConstellationLifecycle.shutdown() before application exit. This ensures in-flight executions complete gracefully and resources are properly released.

Circuit Breakers

Protect modules from cascading failures:

import io.constellation.execution.CircuitBreakerConfig

val constellation = ConstellationImpl.builder()
.withCircuitBreaker(CircuitBreakerConfig(
failureThreshold = 5, // Open after 5 consecutive failures
resetDuration = 30.seconds, // Try again after 30s
halfOpenMaxProbes = 1 // Allow 1 probe in half-open state
))
.build()

Timeouts

For long-running pipelines, use IO.timeout:

val sig: IO[DataSignature] =
constellation.run(compiled.pipeline, inputs)
.timeout(30.seconds)

Optional: HTTP Server

Add the http-api dependency and start a server:

import io.constellation.http._

ConstellationServer
.builder(constellation, compiler)
.withHost("0.0.0.0")
.withPort(8080)
.withDashboard // Enable web dashboard
.withAuth(AuthConfig(apiKeys = Map(
"admin-key" -> ApiRole.Admin,
"app-key" -> ApiRole.Execute
)))
.withCors(CorsConfig(allowedOrigins = Set("https://app.example.com")))
.withRateLimit(RateLimitConfig(requestsPerMinute = 200, burst = 40))
.withHealthChecks(HealthCheckConfig(enableDetailEndpoint = true))
.run

All hardening features (auth, CORS, rate limiting) are opt-in and disabled by default. See the Security Model for details.

Optional: Cross-Process Modules

If you need modules that run outside the JVM (Python ML models, Go services, etc.), add the Module Provider SDK and wrap your Constellation instance with ModuleProviderManager:

libraryDependencies += "io.github.vledicfranco" %% "constellation-module-provider-sdk" % constellationVersion
import io.constellation.provider.{ModuleProviderManager, ProviderManagerConfig, JsonCValueSerializer}

for {
constellation <- ConstellationImpl.builder().build()
compiler <- LangCompiler.builder.build

// Wrap with gRPC provider support
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(), // gRPC on port 9090
serializer = JsonCValueSerializer
)

// Use 'manager' instead of 'constellation' — it's a drop-in wrapper
_ <- StdLib.allModules.values.toList.traverse(manager.setModule)

// HTTP API on port 8080 + gRPC provider service on port 9090
_ <- ConstellationServer.builder(manager, compiler).run
} yield ()

External providers can then register modules via gRPC:

result = ml.Predict(text)        # Calls external Python provider
enriched = data.Enrich(record) # Calls external Go provider

See the Module Provider Integration Guide for provider SDK setup, configuration, and horizontal scaling.

Next Steps