API Guide
This guide covers programmatic usage of Constellation Engine, including compiling constellation-lang pipelines and creating custom modules.
When to Use the Programmatic API
Choose the programmatic API when you need tight integration with your Scala application and maximum performance. For distributed or polyglot architectures, consider the HTTP API instead.
| Scenario | Recommendation |
|---|---|
| Tight Scala integration | Programmatic - direct type safety, no serialization |
| Microservice architecture | HTTP API - language-agnostic, easier scaling |
| High-throughput, low-latency | Programmatic - no network overhead |
| Multiple client languages | HTTP API - universal REST interface |
| Dynamic pipeline loading | Either - both support hot reload |
Quick Decision Guide
- Building a Scala application? → Use programmatic API for best performance
- Building a polyglot system? → Use HTTP API for flexibility
- Need to scale independently? → Use HTTP API with multiple instances
- Embedding in existing Scala service? → Use programmatic API
Table of Contents
- Compiling Pipelines
- Registering Functions
- Creating Modules
- Running Pipelines
- Working with Types
- Error Handling
Compiling Pipelines
Basic Compilation
import io.constellation.lang.LangCompiler
val compiler = LangCompiler.empty
val source = """
in x: Int
in y: Int
out x
"""
val result = compiler.compile(source, "my-dag")
result match {
case Right(compiled) =>
val dagSpec = compiled.pipeline.image.dagSpec
println(s"DAG name: ${dagSpec.name}")
println(s"Modules: ${dagSpec.modules.size}")
println(s"Data nodes: ${dagSpec.data.size}")
case Left(errors) =>
errors.foreach(e => println(e.format))
}
Using the Builder
import io.constellation.lang.LangCompiler
import io.constellation.lang.semantic._
val compiler = LangCompiler.builder
.withFunction(FunctionSignature(
name = "transform",
params = List("input" -> SemanticType.SString),
returns = SemanticType.SString,
moduleName = "transform-module"
))
.withFunction(FunctionSignature(
name = "score",
params = List(
"data" -> SemanticType.SCandidates(SemanticType.SRecord(Map(
"id" -> SemanticType.SString
))),
"userId" -> SemanticType.SInt
),
returns = SemanticType.SCandidates(SemanticType.SRecord(Map(
"score" -> SemanticType.SFloat
))),
moduleName = "score-module"
))
.build
Compilation Result
final case class CompilationOutput(
pipeline: LoadedPipeline, // The compiled pipeline
warnings: List[CompileWarning] // Non-fatal warnings
)
final case class LoadedPipeline(
image: PipelineImage, // Contains dagSpec + metadata
syntheticModules: Map[UUID, Module.Uninitialized] // Generated modules
)
The syntheticModules map contains automatically generated modules for:
- Merge operations (
+) - Projection operations (
[fields]) - Conditional expressions (
if/else)
Registering Functions
Functions in constellation-lang map to modules at runtime. Register them with FunctionSignature:
import io.constellation.lang.semantic._
val signature = FunctionSignature(
name = "embed-model", // Name used in constellation-lang
params = List( // Parameter names and types
"input" -> SemanticType.SCandidates(
SemanticType.SRecord(Map(
"text" -> SemanticType.SString
))
)
),
returns = SemanticType.SCandidates( // Return type
SemanticType.SRecord(Map(
"embedding" -> SemanticType.SList(SemanticType.SFloat)
))
),
moduleName = "embed-model" // Constellation module name
)
val registry = FunctionRegistry.empty
registry.register(signature)
Using ModuleBridge
For modules with existing specs, use ModuleBridge:
import io.constellation.lang.ModuleBridge
// Extract types from module spec
val params = ModuleBridge.extractParams(myModule)
val returns = ModuleBridge.extractReturns(myModule)
// Create signature
val sig = ModuleBridge.signatureFromModule(
languageName = "my-function",
module = myModule,
params = params,
returns = returns
)
Creating Modules
Using ModuleBuilder
import io.constellation.ModuleBuilder
import cats.effect.IO
// Define input/output case classes
case class MyInput(data: String, count: Int)
case class MyOutput(result: String)
val module = ModuleBuilder
.metadata(
name = "my-module",
description = "Processes data",
majorVersion = 1,
minorVersion = 0
)
.tags("ml", "processing")
.implementation[MyInput, MyOutput] { input =>
IO.pure(MyOutput(s"Processed: ${input.data} x ${input.count}"))
}
.build
Pure Implementations
For synchronous operations:
val module = ModuleBuilder
.metadata("pure-module", "Pure transformation", 1, 0)
.implementationPure[MyInput, MyOutput] { input =>
MyOutput(input.data.toUpperCase)
}
.build
With Context
Return additional context alongside the result:
import cats.Eval
import io.circe.Json
val module = ModuleBuilder
.metadata("context-module", "With context", 1, 0)
.implementationWithContext[MyInput, MyOutput] { input =>
IO.pure(Module.Produces(
data = MyOutput(input.data),
implementationContext = Eval.later(Map(
"processingTime" -> Json.fromLong(System.currentTimeMillis)
))
))
}
.build
HTTP Endpoint Publishing
Mark a module as callable via POST /modules/{name}/invoke without writing a .cst pipeline:
val module = ModuleBuilder
.metadata("Uppercase", "Convert text to uppercase", 1, 0)
.httpEndpoint() // Published at /modules/Uppercase/invoke
.implementationPure[TextInput, TextOutput] { input =>
TextOutput(input.text.toUpperCase)
}
.build
Discover published modules via GET /modules/published. Optionally pass a custom config:
.httpEndpoint(ModuleHttpConfig(published = false)) // Registered but not published
See the HTTP API Overview for endpoint details.
Configuration
import scala.concurrent.duration._
val module = ModuleBuilder
.metadata("configured-module", "With timeouts", 1, 0)
.inputsTimeout(10.seconds) // Time to wait for inputs
.moduleTimeout(5.seconds) // Time for module execution
.implementation[MyInput, MyOutput](processFunction)
.build
Running Pipelines
Basic Execution
Always use Resource or use to ensure proper cleanup. The Constellation instance manages thread pools and caches that need graceful shutdown. When using IOApp, the runtime handles this automatically. For other contexts, compose with your application's effect lifecycle.
import io.constellation._
import cats.effect.unsafe.implicits.global
val result = for {
constellation <- impl.ConstellationImpl.init
_ <- modules.traverse(constellation.setModule)
// Compile the pipeline
compiled <- IO.fromEither(
compiler.compile(source, "my-dag").left.map(e => new Exception(e.head.format))
)
// Prepare inputs
inputs = Map(
"x" -> CValue.CInt(42),
"y" -> CValue.CString("hello")
)
// Run the pipeline
sig <- constellation.run(compiled.pipeline, inputs)
} yield sig
val signature = result.unsafeRunSync()
Accessing Results
val sig: DataSignature = result.unsafeRunSync()
// Check execution status
println(s"Status: ${sig.status}")
println(s"Execution ID: ${sig.executionId}")
// Get declared outputs
sig.outputs.foreach { case (name, value) =>
println(s"Output $name: $value")
}
// Get all computed intermediate values
sig.computedNodes.foreach { case (name, value) =>
println(s"Node $name: $value")
}
// Check for missing inputs
if (sig.missingInputs.nonEmpty)
println(s"Missing inputs: ${sig.missingInputs}")
Working with Types
SemanticType (Compiler)
Used during compilation for type checking:
import io.constellation.lang.semantic.SemanticType
// Primitives
val stringType = SemanticType.SString
val intType = SemanticType.SInt
val floatType = SemanticType.SFloat
val boolType = SemanticType.SBoolean
// Records
val recordType = SemanticType.SRecord(Map(
"id" -> SemanticType.SString,
"count" -> SemanticType.SInt
))
// Parameterized
val listType = SemanticType.SList(SemanticType.SString)
val mapType = SemanticType.SMap(SemanticType.SString, SemanticType.SInt)
val candidatesType = SemanticType.SCandidates(recordType)
CType (Runtime)
Used at runtime for data representation:
import io.constellation.CType
// Primitives
val stringType = CType.CString
val intType = CType.CInt
// Compound types
val productType = CType.CProduct(Map(
"name" -> CType.CString,
"age" -> CType.CInt
))
val listType = CType.CList(CType.CString)
Converting Between Types
import io.constellation.lang.semantic.SemanticType
// SemanticType → CType
val cType = SemanticType.toCType(semanticType)
// CType → SemanticType
val semType = SemanticType.fromCType(cType)
CValue (Runtime Values)
import io.constellation.CValue
// Create values
val stringVal = CValue.CString("hello")
val intVal = CValue.CInt(42)
val floatVal = CValue.CFloat(3.14)
val boolVal = CValue.CBoolean(true)
val listVal = CValue.CList(
List(CValue.CInt(1), CValue.CInt(2), CValue.CInt(3)),
CType.CInt
)
val productVal = CValue.CProduct(Map(
"name" -> CValue.CString("Alice"),
"age" -> CValue.CInt(30)
))
// Access type
val typ: CType = stringVal.ctype
Error Handling
Avoid using unsafeRunSync() in production code. It blocks the current thread and can cause deadlocks. Use IOApp or integrate with your application's effect system for proper async execution.
Compile Errors
compiler.compile(source, "dag") match {
case Right(result) =>
// Success
case Left(errors) =>
errors.foreach {
case e: CompileError.ParseError =>
println(s"Syntax error: ${e.format}")
case e: CompileError.UndefinedVariable =>
println(s"Unknown variable '${e.name}' at ${e.position}")
case e: CompileError.UndefinedType =>
println(s"Unknown type '${e.name}' at ${e.position}")
case e: CompileError.UndefinedFunction =>
println(s"Unknown function '${e.name}' at ${e.position}")
case e: CompileError.TypeMismatch =>
println(s"Expected ${e.expected}, got ${e.actual} at ${e.position}")
case e: CompileError.InvalidProjection =>
println(s"Field '${e.field}' not found. Available: ${e.availableFields}")
case e: CompileError.IncompatibleMerge =>
println(s"Cannot merge ${e.leftType} + ${e.rightType}")
case e =>
println(e.format)
}
}
Runtime Errors
import cats.effect.IO
val result: IO[DataSignature] = constellation.run(compiled.pipeline, inputs)
result.attempt.map {
case Right(sig) =>
sig.status match {
case PipelineStatus.Completed =>
println(s"Success: ${sig.outputs}")
case PipelineStatus.Failed(errors) =>
errors.foreach(e => println(s"Node ${e.nodeName} failed: ${e.message}"))
case PipelineStatus.Suspended =>
println(s"Suspended, missing: ${sig.missingInputs}")
}
case Left(error) =>
// Global execution failure
println(s"Execution failed: ${error.getMessage}")
}
Complete Example
import cats.effect.{IO, IOApp}
import cats.implicits._
import io.constellation._
import io.constellation.impl.ConstellationImpl
import io.constellation.lang.LangCompiler
import io.constellation.lang.semantic._
object MyPipeline extends IOApp.Simple {
// Define module
case class ProcessInput(data: String)
case class ProcessOutput(result: String)
val processModule = ModuleBuilder
.metadata("process", "Process data", 1, 0)
.implementationPure[ProcessInput, ProcessOutput] { input =>
ProcessOutput(input.data.toUpperCase)
}
.build
// Create compiler
val compiler = LangCompiler.builder
.withFunction(FunctionSignature(
name = "process",
params = List("data" -> SemanticType.SString),
returns = SemanticType.SString,
moduleName = "process"
))
.build
val source = """
in data: String
result = process(data)
out result
"""
def run: IO[Unit] = for {
// Initialize
constellation <- ConstellationImpl.init
_ <- constellation.setModule(processModule)
// Compile
compiled <- IO.fromEither(
compiler.compile(source, "my-pipeline")
.left.map(e => new Exception(e.map(_.format).mkString("\n")))
)
// Run
sig <- constellation.run(
compiled.pipeline,
Map("data" -> TypeSystem.CValue.VString("hello world"))
)
// Output
_ <- IO.println(s"Status: ${sig.status}")
_ <- IO.println(s"Results: ${sig.outputs}")
} yield ()
}
Cross-Process Modules with ModuleProviderManager
For modules that need their own process, language runtime, or independent scaling, wrap your Constellation instance with ModuleProviderManager. This enables external services to register pipeline modules via gRPC while keeping the same programmatic API.
import io.constellation.provider.{ModuleProviderManager, ProviderManagerConfig, JsonCValueSerializer}
import io.constellation.impl.ConstellationImpl
import io.constellation.lang.LangCompiler
for {
// 1. Create base Constellation instance
constellation <- ConstellationImpl.builder().build()
// 2. Create compiler
compiler <- LangCompiler.builder.build
// 3. Wrap with module provider support
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(), // gRPC port 9090 by default
serializer = JsonCValueSerializer
)
// 4. Use 'manager' everywhere you'd use 'constellation'
// It delegates all Constellation methods and also accepts gRPC registrations
_ <- manager.setModule(myInProcessModule) // In-process modules still work
// 5. Optionally start HTTP server with provider support
_ <- ConstellationServer.builder(manager, compiler).run
// Now: HTTP API on port 8080, gRPC provider service on port 9090
} yield ()
When to use ModuleProviderManager:
| Scenario | Use ConstellationImpl | Use ModuleProviderManager |
|---|---|---|
| All modules are in-process Scala | Yes | No |
| Need external modules (Python, Go, etc.) | No | Yes |
| Need horizontally scaled module pools | No | Yes |
| Need independent module deployment | No | Yes |
ModuleProviderManager is a drop-in wrapper — it implements Constellation and delegates all calls to the underlying instance. Add the module-provider dependency to your project:
libraryDependencies += "io.github.vledicfranco" %% "constellation-module-provider-sdk" % constellationVersion
See the Module Provider Integration Guide for the full protocol, SDK setup, and configuration options.
Next Steps
- HTTP API Overview — REST API for polyglot architectures
- Standard Library — Built-in functions to register with your compiler
- Error Reference — Compile and runtime error handling
- Technical Architecture — How pipelines are compiled and executed
- Cache Backend — Implement custom caching for module results
- Module Provider — Cross-process modules via gRPC