Skip to main content

Core Concepts

This page introduces the key concepts behind Constellation Engine. Read this before diving into the tutorial or language reference.

What Constellation Engine Is

Constellation Engine is a type-safe pipeline orchestration framework for Scala 3. It lets you define data-processing pipelines in a declarative DSL (constellation-lang), implement the underlying functions in Scala, and execute them with automatic parallelization, type checking, and resilience.

Two Layers

Constellation separates what your pipeline does from how it's implemented:

LayerRoleFormat
constellation-langDeclarative pipeline definitions.cst files (hot-reloadable)
Scala runtimeModule implementations, execution engineScala 3 + Cats Effect

Pipeline authors write .cst files that reference named modules. Module developers implement those modules in Scala with full access to the JVM ecosystem (HTTP clients, databases, ML libraries).

Core Concepts

Module

A Module is the basic unit of computation. Each module has:

  • A name (PascalCase, e.g., FetchCustomer)
  • Input parameters with typed fields
  • Output fields with known types
  • An implementation (pure function or IO effect)
val fetchCustomer = ModuleBuilder
.metadata("FetchCustomer", "Fetch customer data", 1, 0)
.implementation[CustomerInput, CustomerOutput] { input =>
IO { httpClient.get(s"/customers/${input.customerId}") }
}
.build

Pipeline

A Pipeline is a directed acyclic graph (DAG) of module calls, declared in constellation-lang:

in order: { id: String, customerId: String }

customer = FetchCustomer(order.customerId)
shipping = EstimateShipping(order.id)

out order + customer + shipping

The compiler resolves dependencies between module calls and the runtime executes independent branches in parallel.

DagSpec

A DagSpec is the compiled representation of a pipeline. It contains:

  • Module nodes (Map[UUID, ModuleNodeSpec]) -- each module call in the pipeline
  • Data nodes (Map[UUID, DataNodeSpec]) -- each value flowing between modules
  • Edges -- connections between data nodes and module nodes (inputs/outputs)
  • Declared outputs -- the pipeline's output variables

Type System (CType / CValue)

The runtime type system ensures type safety across the entire pipeline:

CType (types):

  • Primitives: CString, CInt, CFloat, CBoolean
  • Collections: CList, CMap
  • Structured: CProduct (records), CUnion (tagged unions)
  • Optional: COptional

CValue (values):

  • Every runtime value carries its CType, enabling type checking at DAG boundaries
  • Automatic derivation maps Scala case classes to/from CType and CValue
Type Safety Benefit

The CType/CValue system ensures type mismatches are caught at pipeline boundaries, not deep inside module implementations. This makes debugging significantly easier.

Pipeline Lifecycle

A constellation-lang source file goes through several stages before execution:

Source (.cst)
|
v
Parse -- text -> AST
|
v
Type Check -- validate types, infer missing types
|
v
IR Generate -- typed AST -> intermediate representation
|
v
Optimize -- dead code elimination, constant folding, CSE
|
v
DAG Compile -- IR -> DagSpec + synthetic modules
|
v
PipelineImage -- immutable snapshot (structural hash, DagSpec, options)
|
v
LoadedPipeline -- image + runtime module instances
|
v
Execute -- parallel DAG traversal -> DataSignature (results)

Each stage catches different classes of errors: the parser catches syntax errors, the type checker catches field typos and type mismatches, and the DAG compiler validates module availability.

Early Error Detection

Errors are caught as early as possible in the pipeline. Syntax errors fail at parse time, type errors at type-check time, and missing modules at DAG compile time. You will never see a "field not found" error at runtime.

Content-Addressed Storage

Compiled pipelines are stored using content addressing:

  • Structural hash -- SHA-256 of the canonicalized DagSpec (independent of UUIDs). Two pipelines with identical logic produce the same hash.
  • Syntactic hash -- SHA-256 of the source text. Used for cache-hit detection: if the source and module registry haven't changed, the existing compiled pipeline is reused.
  • Aliases -- Human-readable names that point to a structural hash. You can repoint an alias to a different version without changing client code.

The PipelineStore trait provides:

  • store(image) -- save a compiled pipeline, returns structural hash
  • get(hash) -- retrieve by structural hash
  • alias(name, hash) -- create or update a named alias
  • resolve(name) -- look up alias to structural hash

Execution Modes

Hot Pipeline (compile + run)

Send source code to the /run endpoint. The server compiles and executes in one step:

curl -X POST http://localhost:8080/run \
-d '{"source": "in x: Int\nout x", "inputs": {"x": 42}}'

Cold Pipeline (store + execute by reference)

Compile once via /compile, then execute by name or hash via /execute:

# Compile and store
curl -X POST http://localhost:8080/compile \
-d '{"source": "...", "name": "my-pipeline"}'

# Execute by name
curl -X POST http://localhost:8080/execute \
-d '{"ref": "my-pipeline", "inputs": {"x": 42}}'

# Execute by structural hash
curl -X POST http://localhost:8080/execute \
-d '{"ref": "sha256:abc123...", "inputs": {"x": 42}}'

Suspend and Resume

Pipelines support partial execution. If some inputs are missing, the pipeline suspends instead of failing:

  1. The runtime executes all modules whose inputs are available
  2. Modules waiting on missing inputs are left in Unfired state
  3. A SuspendedExecution snapshot is saved with the execution ID
  4. The caller can later resume with the missing inputs via POST /executions/{id}/resume

This enables incremental execution -- provide inputs as they become available, and the pipeline picks up where it left off.

Use Case

Suspend and resume is ideal for long-running workflows where some inputs arrive asynchronously (e.g., user approval, external API callback, file upload completion).

Resilience

Modules support declarative resilience options via the with clause in constellation-lang:

result = SlowService(data) with {
timeout: 5000,
retry: 3,
backoff: "exponential",
fallback: DefaultValue(data),
cache: 60000
}

Available options:

OptionPurpose
retryMax retry count on failure
timeoutExecution timeout (ms)
delayDelay before execution (ms)
backoffRetry strategy: fixed, linear, exponential
fallbackAlternative module call on failure
cacheCache TTL (ms)
cache_backendNamed cache backend (e.g., redis)
throttleRate limiting (count per time window)
concurrencyMax concurrent instances
on_errorError strategy: propagate, skip, log, wrap
lazyDefer execution until result is needed
priorityExecution priority (0-100)

HTTP API

The server exposes several endpoint groups:

GroupEndpointsPurpose
Health/health, /health/live, /health/readyLiveness and readiness probes
Compile & Run/compile, /runCompile and/or execute pipelines
Pipeline Management/pipelines, /pipelines/{ref}, /executeStore, list, and execute pipelines
Suspension/executions, /executions/{id}/resumeManage suspended executions
Versioning/pipelines/{name}/reload, /pipelines/{name}/versionsHot-reload and version history
Canary/pipelines/{name}/canaryCanary deployments with traffic splitting
Modules/modulesList registered modules
Metrics/metricsCache stats and execution counts
LSP/lsp (WebSocket)Language Server Protocol for IDEs

See the HTTP API Overview for full endpoint documentation.

Tooling

  • VSCode Extension -- Syntax highlighting, autocomplete, inline errors, hover types, DAG visualization, and one-click execution via the Language Server Protocol.
  • Web Dashboard -- Browser-based UI for browsing files, running pipelines, viewing DAG graphs, and inspecting execution history. Served by the HTTP server.
  • LSP -- Standard Language Server Protocol support. Works with any LSP-compatible editor.

Next Steps