Skip to main content

Module Provider Integration

Build external services that register pipeline modules with a running Constellation instance via gRPC. Modules can be written in any language, run in separate processes, and scale independently.

When to Use

ScenarioModule ProviderIn-Process Module
Module runs in a separate processYesNo
Module needs independent scalingYesNo
Module written in Python/Go/RustYesNo
Module runs in the same JVMNoYes
Sub-millisecond latency requiredNoYes

Decision rule: If your module needs its own process, language runtime, or scaling strategy, use Module Provider. Otherwise, use ModuleBuilder for in-process modules.

Architecture

Provider Process                    Constellation Server
┌──────────────────────┐ ┌──────────────────────────┐
│ ConstellationProvider │──Register─>│ ModuleProviderManager │
│ │──Heartbeat>│ ControlPlaneManager │
│ ModuleExecutorServer │<─Execute──│ ExternalModule │
│ (handles requests) │ │ ExecutorPool (LB) │
└──────────────────────┘ └──────────────────────────┘

Flow:

  1. Provider calls Register RPC with namespace, executor URL, and module declarations
  2. Server validates schemas, creates ExternalModule per module, registers with ModuleRegistry
  3. Provider opens ControlPlane stream for heartbeats
  4. When a pipeline calls the module, server sends ExecuteRequest to provider's executor
  5. Provider runs handler, returns result

SDK Quick Start

1. Define a Module

import io.constellation.{CType, CValue}
import io.constellation.provider.sdk._
import cats.effect.IO

val analyzeModule = ModuleDefinition(
name = "Analyze",
inputType = CType.CProduct(Map("text" -> CType.CString)),
outputType = CType.CProduct(Map(
"sentiment" -> CType.CFloat,
"confidence" -> CType.CFloat
)),
version = "1.0.0",
description = "Sentiment analysis",
handler = { input =>
val text = input.asInstanceOf[CValue.CProduct].values("text")
.asInstanceOf[CValue.CString].value
IO.pure(CValue.CProduct(Map(
"sentiment" -> CValue.CFloat(0.85),
"confidence" -> CValue.CFloat(0.92)
)))
}
)

2. Create and Start Provider

import io.constellation.provider.sdk._
import io.constellation.provider.{JsonCValueSerializer, GrpcProviderTransport}
import io.grpc.ManagedChannelBuilder
import cats.effect.{IO, IOApp}

object MyProvider extends IOApp.Simple {
def run: IO[Unit] = {
for {
provider <- ConstellationProvider.create(
namespace = "ml",
instances = List("localhost:9090"),
config = SdkConfig(),
transportFactory = { addr =>
val Array(host, port) = addr.split(":")
val channel = ManagedChannelBuilder
.forAddress(host, port.toInt).usePlaintext().build()
new GrpcProviderTransport(channel)
},
executorServerFactory = new GrpcExecutorServerFactory(),
serializer = JsonCValueSerializer
)
_ <- provider.register(analyzeModule)
_ <- provider.start.useForever
} yield ()
}
}

3. Use in constellation-lang

in text: String
result = ml.Analyze(text)
out result

Configuration

SDK (Provider-Side)

val config = SdkConfig(
executorPort = 9091, // Port for receiving ExecuteRequests
heartbeatInterval = 5.seconds,
reconnectBackoff = 1.second,
maxReconnectBackoff = 60.seconds,
maxReconnectAttempts = 10,
groupId = None // Set for horizontal scaling
)
ParameterDefaultDescription
executorPort9091Port the provider listens on
heartbeatInterval5sHeartbeat frequency
reconnectBackoff1sInitial reconnection delay
maxReconnectBackoff60sMaximum reconnection delay
maxReconnectAttempts10Max consecutive reconnect failures
groupIdNoneShared group ID for horizontal scaling

Server-Side

ParameterDefaultEnv Variable
grpcPort9090CONSTELLATION_PROVIDER_PORT
heartbeatTimeout15sCONSTELLATION_PROVIDER_HEARTBEAT_TIMEOUT
controlPlaneRequiredTimeout30sCONSTELLATION_PROVIDER_CONTROL_PLANE_TIMEOUT
reservedNamespacesstdlibCONSTELLATION_PROVIDER_RESERVED_NS

Server-Side Setup

import io.constellation.provider.{ModuleProviderManager, ProviderManagerConfig}
import io.constellation.provider.JsonCValueSerializer

for {
constellation <- ConstellationImpl.builder().build()
compiler <- LangCompiler.builder.build
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(),
serializer = JsonCValueSerializer
)
} yield manager
// manager is a Constellation that also accepts gRPC registrations on port 9090

Combined HTTP + gRPC Server

Pass manager to ConstellationServer.builder() to run both services:

for {
constellation <- ConstellationImpl.builder().build()
compiler <- LangCompiler.builder.build
manager <- ModuleProviderManager(
delegate = constellation,
compiler = compiler,
config = ProviderManagerConfig(),
serializer = JsonCValueSerializer
)
// HTTP API on port 8080, gRPC provider service on port 9090
_ <- ConstellationServer.builder(manager, compiler).run
} yield ()

ModuleProviderManager implements Constellation — it's a drop-in wrapper. In-process modules via setModule still work alongside external gRPC-provided modules.

Type System Mapping

CTypeProtobuf
CStringPrimitiveType.STRING
CIntPrimitiveType.INT
CFloatPrimitiveType.FLOAT
CBooleanPrimitiveType.BOOL
CProductRecordType
CListListType
CMapMapType
COptionalOptionType
CUnionUnionType

Conversion is handled by TypeSchemaConverter. Round-trip fidelity: toCType(toTypeSchema(t)) == t for all supported types.

Horizontal Scaling

Multiple providers can serve the same namespace by sharing a groupId:

val config = SdkConfig(groupId = Some("ml-pool"))
Provider A (group_id: "ml-pool")  ──┐
Provider B (group_id: "ml-pool") ──┼── namespace: "ml" → round-robin
Provider C (group_id: "ml-pool") ──┘

Rules:

  • All group members must register the same modules with compatible schemas
  • Solo providers (no groupId) have exclusive namespace ownership
  • Solo and group cannot coexist in the same namespace
  • Last group member disconnecting releases the namespace

Namespace Rules

Dot-separated identifiers: ml, data.transform, company.service

RuleExample
Segment starts with letterml (ok), 2ml (rejected)
Alphanumeric + underscoresmy_service (ok), my-service (rejected)
Case-sensitiveML != ml
No reserved prefixesstdlib.foo (rejected)
Exclusive ownershipOne provider (or group) per namespace

Validation Checks

The server validates every Register request:

CheckRejection Reason
Namespace syntaxInvalid dot-separated identifiers
Namespace ownershipNamespace owned by another provider
Reserved namespacestdlib prefix is protected
Executor URL formatMust be host:port (no scheme prefix)
Module name formatMust start with letter, alphanumeric + underscores
Type schema validityInput/output schemas must be well-formed

Connection Lifecycle

Provider                          Server
│ │
├── Register ──────────────────> │ Validates, creates ExternalModule
│ │
├── ControlPlane (stream) ────-> │ Transitions to Active
│ │
├── Heartbeat ─────────────────> │ Updates lastHeartbeatAt
│ <── HeartbeatAck ──────────── │
│ │
│ <── DrainRequest ──────────── │ (graceful shutdown)
├── DrainAck ──────────────────> │
│ │
├── Deregister ────────────────> │ Removes modules

If heartbeat lapses, server auto-deregisters the provider's modules after heartbeatTimeout.

Canary Rollout

Safely upgrade modules across connected instances:

val result = provider.canaryRollout(newModules).unsafeRunSync()
// CanaryResult: Promoted | RolledBack(reason) | PartialFailure(promoted, failed)

Rolls out to one instance at a time, waits for observation window, checks health, promotes or rolls back.

Common Pitfalls

PitfallSolution
Using grpc://host:port as executor URLUse host:port only (no scheme prefix)
Connecting to HTTP port (8080) instead of gRPC port (9090)Module Provider uses its own gRPC port
Forgetting new for GrpcExecutorServerFactoryIt's a class, not a case class: new GrpcExecutorServerFactory()
GrpcProviderTransport.apply(host, port) returns ResourceUse the constructor with a ManagedChannel for non-Resource usage
Namespace not persisted across server restartsProviders must re-register after server restart

Key Source Files

ComponentModulePath
SDK entry pointmodule-provider-sdkio.constellation.provider.sdk.ConstellationProvider
Module definitionmodule-provider-sdkio.constellation.provider.sdk.ModuleDefinition
CValue serializationmodule-provider-sdkio.constellation.provider.CValueSerializer
Type conversionmodule-provider-sdkio.constellation.provider.TypeSchemaConverter
Transport traitsmodule-provider-sdkio.constellation.provider.sdk.transport
Server orchestratormodule-providerio.constellation.provider.ModuleProviderManager
Schema validationmodule-providerio.constellation.provider.SchemaValidator
Connection lifecyclemodule-providerio.constellation.provider.ControlPlaneManager
Load balancingmodule-providerio.constellation.provider.ExecutorPool
TypeScript SDKsdks/typescriptsdks/typescript/src/provider/constellation-provider.ts
TS CValue serializationsdks/typescriptsdks/typescript/src/serialization/cvalue-serializer.ts
TS type conversionsdks/typescriptsdks/typescript/src/serialization/type-schema-converter.ts
TS transportsdks/typescriptsdks/typescript/src/transport/transport.ts

See Also


Back to: Module Registration | Up to: LLM Guide Index