Skip to main content

Module Provider Integration Guide

Overview

The Module Provider Protocol lets external services register pipeline modules with a running Constellation instance via gRPC. This enables modules written in any language, running in separate processes, with independent scaling and deployment lifecycles.

Use cases:

  • ML inference services exposing models as pipeline modules
  • Microservices contributing domain-specific transformations
  • Polyglot pipelines (Python, Go, Rust modules alongside Scala)
  • Horizontally scaled module pools with load balancing
When to Use

Use Module Providers when your module needs its own process, language runtime, or scaling strategy. For modules that run in the same JVM, use the standard ModuleBuilder API instead — it's simpler and faster.

How It Works

┌── Provider Process ──────────────────────────────────────┐
│ ConstellationProvider ←→ ModuleExecutorServer (gRPC) │
└────────────┬────────────────────────▲────────────────────┘
│ Register / Heartbeat │ Execute
▼ │
┌── Constellation Server ─────────────┴────────────────────┐
│ ModuleProviderManager → ExternalModule (in pipeline) │
└──────────────────────────────────────────────────────────┘
  1. Register — Provider sends module declarations (name, input/output types) to the server
  2. Execute — When a pipeline calls the module, the server sends an ExecuteRequest to the provider
  3. Heartbeat — Provider maintains a control plane stream with periodic heartbeats
  4. Deregister — Provider gracefully removes its modules on shutdown

SDK Quick Start

Add the SDK dependency to your build.sbt:

libraryDependencies += "io.github.vledicfranco" %% "constellation-module-provider-sdk" % "0.7.0"

Defining Modules

import io.constellation.{CType, CValue}
import io.constellation.provider.sdk._
import io.constellation.provider.JsonCValueSerializer
import io.grpc.ManagedChannelBuilder
import cats.effect.{IO, IOApp}

object MyProvider extends IOApp.Simple {

val analyzeModule = ModuleDefinition(
name = "Analyze",
inputType = CType.CProduct(Map("text" -> CType.CString)),
outputType = CType.CProduct(Map("sentiment" -> CType.CFloat, "confidence" -> CType.CFloat)),
version = "1.0.0",
description = "Sentiment analysis",
handler = { input =>
val text = input.asInstanceOf[CValue.CProduct].values("text")
.asInstanceOf[CValue.CString].value
// Your analysis logic here
IO.pure(CValue.CProduct(Map(
"sentiment" -> CValue.CFloat(0.85),
"confidence" -> CValue.CFloat(0.92)
)))
}
)

def run: IO[Unit] = {
for {
provider <- ConstellationProvider.create(
namespace = "ml",
instances = List("localhost:9090"),
config = SdkConfig(),
transportFactory = { addr =>
val Array(host, port) = addr.split(":")
val channel = ManagedChannelBuilder.forAddress(host, port.toInt).usePlaintext().build()
new GrpcProviderTransport(channel)
},
executorServerFactory = new GrpcExecutorServerFactory(),
serializer = JsonCValueSerializer
)
_ <- provider.register(analyzeModule)
_ <- provider.start.useForever
} yield ()
}
}

Once running, the module is available in constellation-lang:

in text: String
result = ml.Analyze(text)
out result

Configuration

import io.constellation.provider.sdk.{SdkConfig, CanaryConfig}
import scala.concurrent.duration._

val config = SdkConfig(
executorPort = 9091, // Port for receiving ExecuteRequests
heartbeatInterval = 5.seconds,
reconnectBackoff = 1.second,
maxReconnectBackoff = 60.seconds,
maxReconnectAttempts = 10,
groupId = None, // Set for horizontal scaling
canary = CanaryConfig(
observationWindow = 30.seconds,
healthThreshold = 0.95,
rollbackOnFailure = true
)
)
ParameterDefaultDescription
executorPort9091Port the provider listens on for ExecuteRequests
heartbeatInterval5sHow often heartbeats are sent
reconnectBackoff1sInitial delay before reconnection attempt
maxReconnectBackoff60sMaximum delay between reconnection attempts
maxReconnectAttempts10Give up after this many consecutive failures
groupIdNoneShared group ID for horizontal scaling

Server-Side Setup

The server needs ModuleProviderManager wrapping a Constellation instance:

import io.constellation.provider.{ModuleProviderManager, ProviderManagerConfig}
import io.constellation.provider.JsonCValueSerializer
import io.constellation.impl.ConstellationImpl

for {
constellation <- ConstellationImpl.builder().build()
compiler <- LangCompiler.builder.build
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(),
serializer = JsonCValueSerializer
)
// manager is now a Constellation that also accepts gRPC registrations on port 9090
} yield manager

Combined HTTP API + gRPC Server

To run both the HTTP API and the gRPC provider service, pass the manager to ConstellationServer.builder():

import io.constellation.http.ConstellationServer
import io.constellation.stdlib.StdLib
import cats.implicits._

for {
constellation <- ConstellationImpl.builder().build()
_ <- StdLib.allModules.values.toList.traverse(constellation.setModule)
compiler <- LangCompiler.builder.build
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(),
serializer = JsonCValueSerializer
)
// HTTP API on port 8080, gRPC provider service on port 9090
_ <- ConstellationServer.builder(manager, compiler).run
} yield ()

ModuleProviderManager implements the Constellation trait, so it's a drop-in replacement anywhere you use a Constellation instance. In-process modules registered via setModule continue to work alongside external gRPC-provided modules.

Server Configuration

ParameterDefaultEnv Variable
grpcPort9090CONSTELLATION_PROVIDER_PORT
heartbeatTimeout15sCONSTELLATION_PROVIDER_HEARTBEAT_TIMEOUT
controlPlaneRequiredTimeout30sCONSTELLATION_PROVIDER_CONTROL_PLANE_TIMEOUT
reservedNamespacesstdlibCONSTELLATION_PROVIDER_RESERVED_NS

Type System

Module providers declare input and output types using protobuf TypeSchema messages. The SDK converts between Constellation's CType and protobuf automatically.

Supported Types

CTypeProtobuf Representation
CStringPrimitiveType.STRING
CIntPrimitiveType.INT
CFloatPrimitiveType.FLOAT
CBooleanPrimitiveType.BOOL
CProductRecordType with named fields
CListListType with element type
CMapMapType with key and value types
COptionalOptionType with inner type
CUnionUnionType with variant types

Serialization

CValues are serialized to JSON bytes for transport using JsonCValueSerializer. This supports all CValue variants and provides human-readable wire format for debugging.

import io.constellation.provider.{CValueSerializer, JsonCValueSerializer}

val serializer: CValueSerializer = JsonCValueSerializer

// Serialize
val bytes: Either[String, Array[Byte]] = serializer.serialize(myValue)

// Deserialize
val value: Either[String, CValue] = serializer.deserialize(bytes)

Horizontal Scaling (Provider Groups)

Multiple provider instances can serve the same namespace by sharing a groupId:

val config = SdkConfig(groupId = Some("ml-pool"))

The server maintains an ExecutorPool that load-balances ExecuteRequests across group members using round-robin selection.

Provider A (group_id: "ml-pool")  ──┐
Provider B (group_id: "ml-pool") ──┼── namespace: "ml" → round-robin
Provider C (group_id: "ml-pool") ──┘

Rules:

  • All group members must register the same modules with compatible schemas
  • Solo providers (no groupId) have exclusive namespace ownership
  • A solo provider cannot join a group namespace, and vice versa
  • When the last group member disconnects, the namespace is released

Canary Rollout

Safely upgrade modules across all connected instances:

val newModules = List(analyzeModuleV2)
val result = provider.canaryRollout(newModules).unsafeRunSync()

result match {
case CanaryResult.Promoted =>
println("All instances upgraded successfully")
case CanaryResult.RolledBack(reason) =>
println(s"Rollback: $reason")
case CanaryResult.PartialFailure(promoted, failed) =>
println(s"Partial: ${promoted.size} ok, ${failed.size} failed")
}

The coordinator rolls out to one instance at a time, waits for the observation window, checks health, and either promotes or rolls back all instances.

Namespace Rules

Providers register modules under a namespace (e.g., ml, data.transform). Namespaces:

  • Use dot-separated segments: ml, data.transform, company.service
  • Each segment must start with a letter and contain only letters, digits, and underscores
  • Are case-sensitive
  • Cannot use reserved prefixes (stdlib by default)
  • Are exclusively owned by one provider or one provider group

In constellation-lang, namespaced modules are called with dot notation:

result = ml.Analyze(text)
enriched = data.transform.Enrich(record)

Validation

The server validates every registration request:

CheckError
Namespace syntaxMust be valid dot-separated identifiers
Namespace ownershipCannot register in another provider's namespace
Reserved namespaceCannot use stdlib prefix
Executor URL formatMust be valid host:port (no scheme prefix)
Module name formatMust start with letter, alphanumeric + underscores only
Type schema validityInput and output schemas must be well-formed

Failed validations return a RegisterResponse with success = false and a rejection reason per module.

Connection Lifecycle

Provider                          Server
│ │
├── Register ──────────────────→ │ Validates, creates ExternalModule
│ │
├── ControlPlane (open stream) → │ Transitions to Active
│ │
├── Heartbeat ────────────────→ │ Updates lastHeartbeatAt
│ ←── HeartbeatAck ────────── ─┤
│ ←── ActiveModulesReport ─────┤
│ │
│ ←── DrainRequest ────────────┤ (graceful shutdown)
├── DrainAck ──────────────────→ │
│ │
├── Deregister ────────────────→ │ Removes modules
│ │

If the provider stops sending heartbeats, the server auto-deregisters its modules after heartbeatTimeout.

Gotchas

  • gRPC port vs HTTP port: The Module Provider gRPC service (default 9090) is separate from the HTTP API (default 8080). Providers connect to the gRPC port.
  • Executor URL format: Use host:port without a scheme prefix. grpc:// or http:// will be rejected.
  • Namespace persistence: Namespaces are not persisted. If the server restarts, providers must re-register.
  • Latency overhead: Cross-process modules add network round-trip latency. Use in-process modules for latency-critical operations.
  • Serialization cost: CValues are serialized to JSON bytes for transport. Complex nested types incur serialization overhead.

TypeScript SDK Quick Start

Install the npm package:

npm install @constellation-engine/provider-sdk

Define and start a provider:

import {
ConstellationProvider,
CTypes,
CValues,
GrpcProviderTransport,
GrpcExecutorServerFactory,
} from "@constellation-engine/provider-sdk";

const provider = await ConstellationProvider.create({
namespace: "ml",
instances: ["localhost:9090"],
transportFactory: (addr) => {
const [host, port] = addr.split(":");
return new GrpcProviderTransport(host, parseInt(port));
},
executorServerFactory: new GrpcExecutorServerFactory(),
});

provider.register({
name: "Analyze",
inputType: CTypes.product({ text: CTypes.string() }),
outputType: CTypes.product({
sentiment: CTypes.float(),
confidence: CTypes.float(),
}),
version: "1.0.0",
description: "Sentiment analysis",
handler: async (input) => {
// Your analysis logic here
return CValues.product(
{
sentiment: CValues.float(0.85),
confidence: CValues.float(0.92),
},
{
sentiment: CTypes.float(),
confidence: CTypes.float(),
},
);
},
});

await provider.start();
console.log("Provider running on port 9091");

process.on("SIGINT", async () => {
await provider.stop();
process.exit(0);
});

The module is then available in constellation-lang as ml.Analyze(text).

TypeScript SDK Configuration

const provider = await ConstellationProvider.create({
namespace: "ml",
instances: ["host1:9090", "host2:9090"],
config: {
executorPort: 9091,
heartbeatIntervalMs: 5000,
reconnectBackoffMs: 1000,
maxReconnectBackoffMs: 60000,
maxReconnectAttempts: 10,
groupId: "ml-pool", // For horizontal scaling
canary: {
observationWindowMs: 30000,
healthThreshold: 0.95,
maxLatencyMs: 5000,
rollbackOnFailure: true,
},
},
transportFactory: (addr) => { /* ... */ },
executorServerFactory: new GrpcExecutorServerFactory(),
});