DAG Composition Patterns
This guide covers proven patterns for composing effective Constellation pipelines. Understanding these patterns helps you write pipelines that are clear, maintainable, and performant.
Constellation automatically detects parallelism from your pipeline structure. When operations don't depend on each other, they run concurrently. Focus on expressing your logic clearly — the DAG compiler handles optimization.
Table of Contents
- Parallel vs Sequential Patterns
- Fan-Out and Fan-In Patterns
- Conditional Branching Patterns
- List Processing Patterns
- Pipeline Organization Best Practices
- Variable Naming Conventions
- Performance Optimization Strategies
- Anti-Patterns to Avoid
Parallel vs Sequential Patterns
Sequential Pattern
Operations that depend on each other form a sequential chain. Each step waits for the previous step to complete.
# Sequential pipeline - each step depends on the previous
in text: String
# Step 1: Clean the text
trimmed = Trim(text)
# Step 2: Convert to lowercase (depends on trimmed)
normalized = Lowercase(trimmed)
# Step 3: Count words (depends on normalized)
wordCount = WordCount(normalized)
out wordCount
DAG Visualization:
text
↓
Trim → trimmed
↓
Lowercase → normalized
↓
WordCount → wordCount
When to use:
- Each operation transforms the output of the previous operation
- The order of operations matters for correctness
- Later operations depend on the results of earlier ones
Parallel Pattern
Operations that use the same inputs but don't depend on each other execute concurrently.
# Parallel pipeline - independent operations on same input
in text: String
# All three execute in parallel (no dependencies between them)
wordCount = WordCount(text)
charCount = CharCount(text)
lineCount = LineCount(text)
out wordCount
out charCount
out lineCount
DAG Visualization:
text
┌────┼────┐
↓ ↓ ↓
WordCount CharCount LineCount
↓ ↓ ↓
wordCount charCount lineCount
When to use:
- Multiple independent analyses or transformations on the same data
- No operation needs the output of another
- Want to minimize total latency (parallel execution is faster than sequential)
Mixed Pattern
Real pipelines often combine sequential and parallel patterns.
# Mixed pattern - combination of sequential and parallel
in rawText: String
# Sequential preprocessing
cleaned = Trim(rawText)
normalized = Lowercase(cleaned)
# Parallel analysis on the normalized text
wordCount = WordCount(normalized)
sentenceCount = SentenceCount(normalized)
avgWordLength = AverageWordLength(normalized)
# Sequential aggregation (depends on all parallel results)
summary = FormatSummary(wordCount, sentenceCount, avgWordLength)
out summary
DAG Visualization:
rawText
↓
Trim → cleaned
↓
Lowercase → normalized
┌─────┼─────┐
↓ ↓ ↓
WordCount SentenceCount AverageWordLength
↓ ↓ ↓
wordCount sentenceCount avgWordLength
└─────┼─────┘
↓
FormatSummary → summary
Best practice:
- Preprocess sequentially to prepare data
- Fan out to parallel analysis
- Aggregate results sequentially
Fan-Out and Fan-In Patterns
Basic Fan-Out
Fan-out distributes work across multiple independent operations. The DAG compiler automatically parallelizes.
# Basic fan-out pattern
in userId: String
in productId: String
in sessionId: String
# Fan-out: three independent service calls execute in parallel
userProfile = FetchUserProfile(userId)
productDetails = FetchProduct(productId)
sessionData = FetchSession(sessionId)
# All three outputs available concurrently
out userProfile
out productDetails
out sessionData
DAG Visualization:
userId productId sessionId
↓ ↓ ↓
FetchUserProfile FetchProduct FetchSession
↓ ↓ ↓
userProfile productDetails sessionData
Performance characteristic: Total latency is max(operation1, operation2, operation3), not the sum.
Fan-Out with Fan-In
Fan-in merges the results from multiple parallel operations.
# Fan-out/fan-in pattern
type Profile = { name: String, email: String }
type Activity = { score: Int, lastSeen: String }
type Preferences = { theme: String, language: String }
in userId: String
# Fan-out: parallel data fetching
profile = FetchProfile(userId)
activity = FetchActivity(userId)
prefs = FetchPreferences(userId)
# Fan-in: merge all data into one record
combined = profile + activity + prefs
# Project specific fields
summary = combined[name, score, theme]
out summary
DAG Visualization:
userId
┌────┼────┐
↓ ↓ ↓
FetchProfile FetchActivity FetchPreferences
↓ ↓ ↓
profile activity prefs
└────┼────┘
↓
merge (+) → combined
↓
project ([]) → summary
When to use:
- Need data from multiple independent sources
- Want to combine results into a single output
- Each source can be queried in parallel
Selective Fan-In
Not all parallel branches need to merge. Select what you need.
# Selective fan-in
in requestId: String
# Fan-out to multiple services
profile = SlowService(requestId)
metadata = FastService(requestId)
analytics = OptionalService(requestId)
# Fan-in: use only some results
response = profile + metadata
out response
out analytics # Separate output, not merged
DAG Visualization:
requestId
┌──────┼──────┐
↓ ↓ ↓
SlowService FastService OptionalService
↓ ↓ ↓
profile metadata analytics
└──────┘ ↓
↓ (separate output)
merge (+) → response
Resilient Fan-Out
Add resilience to individual branches with fallbacks, retries, and timeouts.
# Resilient fan-out pattern
in userId: String
in fallbackProfile: String
# Each branch has its own resilience strategy
profile = FlakyService(userId) with
retry: 3,
timeout: 2s,
fallback: fallbackProfile
activity = SlowApiCall(userId) with
cache: 5min,
timeout: 3s
preferences = ReliableService(userId) with
cache: 1h
# Fan-in with partial results
result = profile + activity + preferences
out result
When to use:
- Working with unreliable external services
- Different services have different SLAs
- Want partial results when some services fail
By default, if one branch fails, sibling branches are cancelled and the error propagates. Use fallback or on_error: log on individual operations to allow partial results.
Conditional Branching Patterns
Guard Pattern
Use guards to conditionally compute values. Guards produce Optional<T> types.
# Guard pattern - conditional computation
in score: Int
in tier: String
# Guarded values (Optional<String>)
premiumOffer = "Free upgrade!" when tier == "premium"
highScoreBonus = "Bonus: 500 points" when score > 80
# Unwrap optionals with fallback
offerMessage = premiumOffer ?? "Standard service"
bonusMessage = highScoreBonus ?? "No bonus"
out offerMessage
out bonusMessage
DAG Visualization:
tier score
↓ ↓
└───┬───┘
↓
guard (when) → premiumOffer (Optional<String>)
↓
coalesce (??) → offerMessage
score
↓
guard (when) → highScoreBonus (Optional<String>)
↓
coalesce (??) → bonusMessage
When to use:
- Value only makes sense under certain conditions
- Want explicit handling of "no value" cases
- Building priority-based fallback chains
Branch Pattern
Use branch for exhaustive classification where every input maps to exactly one output.
# Branch pattern - multi-way classification
in score: Int
# Classify into exactly one tier
tier = branch {
score >= 90 -> "platinum",
score >= 70 -> "gold",
score >= 50 -> "silver",
otherwise -> "bronze"
}
# Route based on multiple conditions
route = branch {
score >= 95 -> "vip-lane",
score >= 80 -> "express-lane",
score >= 50 -> "standard-lane",
otherwise -> "bulk-lane"
}
out tier
out route
DAG Visualization:
score
┌────┼────┐
↓ ↓
branch branch
(tier) (route)
↓ ↓
tier route
When to use:
- Classify input into one of N categories
- Need exactly one result (not optional)
- Decision tree / routing logic
Use branch for exhaustive classification (always get a result). Use guard + ?? for optional enrichment (might not get a value).
Combined Guard and Branch
Guards and branches compose naturally for complex conditional logic.
# Combined conditional pattern
in score: Int
in tier: String
in subscriptionActive: Boolean
# Branch for base routing
baseRoute = branch {
tier == "enterprise" -> "enterprise-queue",
tier == "premium" -> "premium-queue",
otherwise -> "standard-queue"
}
# Guard for conditional upgrades
fastTrack = "fast-track-enabled" when score > 90 and subscriptionActive
priority = "priority-upgrade" when score > 75 and tier == "premium"
# Priority chain: try fast-track, then priority, then base route
finalRoute = fastTrack ?? priority ?? baseRoute
out finalRoute
DAG Visualization:
tier score subscriptionActive
↓ ↓ ↓
└─────┼───────────┘
↓
branch → baseRoute
↓
guard → fastTrack (Optional)
↓
guard → priority (Optional)
↓
coalesce chain (??) → finalRoute
List Processing Patterns
Filter Pattern
Filter lists to keep only elements matching a predicate.
# Filter pattern
use stdlib.collection
use stdlib.compare
in numbers: List<Int>
in threshold: Int
# Keep only numbers above threshold
above = filter(numbers, (x) => gt(x, threshold))
# Chain multiple filters
positives = filter(numbers, (x) => gt(x, 0))
large = filter(positives, (x) => gt(x, 100))
out above
out large
DAG Visualization:
numbers threshold
↓ ↓
└────────┘
↓
filter (above threshold) → above
numbers
↓
filter (positives) → positives
↓
filter (large) → large
Best practice: Filter early to reduce the dataset before expensive operations.
Map Pattern
Transform each element in a list.
# Map pattern
use stdlib.collection
use stdlib.math
in values: List<Int>
in multiplier: Int
# Transform each element
doubled = map(values, (x) => multiply(x, 2))
scaled = map(values, (x) => multiply(x, multiplier))
out doubled
out scaled
When to use:
- Apply same transformation to all elements
- Convert list of one type to list of another type
Filter-Map Pattern
Combine filtering and transformation for efficient processing.
# Filter-map pattern - filter first, then transform
use stdlib.collection
use stdlib.compare
use stdlib.math
in numbers: List<Int>
in threshold: Int
in multiplier: Int
# Step 1: Filter (reduce dataset size)
filtered = filter(numbers, (x) => gt(x, threshold))
# Step 2: Map (transform only remaining elements)
scaled = map(filtered, (x) => multiply(x, multiplier))
out scaled
DAG Visualization:
numbers threshold
↓ ↓
└────────┘
↓
filter → filtered
↓
multiplier
↓
map → scaled
Performance tip: map(filter(list, pred), fn) is faster than filter(map(list, fn), pred) because fewer elements are transformed.
Reduce Pattern
Aggregate a list into a single value.
# Reduce pattern - aggregating lists
in numbers: List<Int>
# Multiple independent aggregations (run in parallel)
total = SumList(numbers)
average = Average(numbers)
maximum = Max(numbers)
minimum = Min(numbers)
count = ListLength(numbers)
out total
out average
out maximum
out minimum
out count
DAG Visualization:
numbers
┌──┬──┼──┬──┐
↓ ↓ ↓ ↓ ↓
SumList Average Max Min ListLength
↓ ↓ ↓ ↓ ↓
total avg max min count
When to use:
- Need summary statistics
- Want multiple aggregations in parallel
- Converting collection to scalar values
All/Any Pattern
Check if all or any elements satisfy a condition.
# All/Any pattern - validation
use stdlib.collection
use stdlib.compare
in scores: List<Int>
in minScore: Int
# Check predicates over the entire list
allPassing = all(scores, (x) => gte(x, minScore))
anyFailing = any(scores, (x) => lt(x, minScore))
allPositive = all(scores, (x) => gt(x, 0))
# Conditional processing based on validation
summary = ProcessScores(scores) when allPassing
out allPassing
out anyFailing
out summary
When to use:
- Validate batch constraints before processing
- Gate expensive operations on collection properties
- Implement business rules on collections
Complete List Processing Pipeline
Combine filter, map, and reduce for comprehensive data processing.
# Complete list processing pipeline
use stdlib.collection
use stdlib.compare
use stdlib.math
in numbers: List<Int>
in threshold: Int
in multiplier: Int
# Step 1: Filter early
filtered = filter(numbers, (x) => gt(x, threshold))
# Step 2: Transform
scaled = map(filtered, (x) => multiply(x, multiplier))
# Step 3: Aggregate (multiple operations in parallel)
total = SumList(scaled)
avg = Average(scaled)
count = ListLength(scaled)
# Step 4: Format results
formattedTotal = FormatNumber(total)
out total
out avg
out count
out formattedTotal
DAG Visualization:
numbers threshold
↓ ↓
└────────┘
↓
filter → filtered
↓
multiplier
↓
map → scaled
┌───┼───┐
↓ ↓ ↓
SumList Average ListLength
↓ ↓ ↓
total avg count
↓
FormatNumber → formattedTotal
Pipeline Organization Best Practices
1. Organize by Concern
Group related operations into logical sections with comments.
# Well-organized pipeline
in rawData: String
# ===== Input Validation =====
trimmed = Trim(rawData)
isValid = ValidateFormat(trimmed)
validated = trimmed when isValid
# ===== Data Extraction =====
normalized = Lowercase(validated ?? "")
tokens = Tokenize(normalized)
entities = ExtractEntities(normalized)
# ===== Feature Engineering =====
tokenCount = ListLength(tokens)
entityCount = ListLength(entities)
avgTokenLength = AverageTokenLength(tokens)
# ===== Scoring =====
qualityScore = ComputeQuality(tokenCount, entityCount)
finalScore = qualityScore * avgTokenLength
out finalScore
out entities
Benefits:
- Easy to understand pipeline flow
- Clear responsibilities for each section
- Easier to maintain and extend
2. Use Descriptive Variable Names
Variables should describe the data they contain, not the operation that created them.
# Good: descriptive names
cleanedText = Trim(rawText)
normalizedText = Lowercase(cleanedText)
wordCount = WordCount(normalizedText)
# Bad: operation-focused names
trimResult = Trim(rawText)
lowercaseResult = Lowercase(trimResult)
countResult = WordCount(lowercaseResult)
3. Extract Intermediate Values
Don't nest deep expressions. Extract intermediate values for clarity.
# Good: clear intermediate steps
in text: String
trimmed = Trim(text)
normalized = Lowercase(trimmed)
wordCount = WordCount(normalized)
isLong = wordCount > 100
embeddings = ComputeEmbeddings(normalized) when isLong
out embeddings
# Bad: deeply nested
embeddings = ComputeEmbeddings(Lowercase(Trim(text))) when WordCount(Lowercase(Trim(text))) > 100
Benefits:
- Easier to debug (can inspect intermediate values)
- Clearer data dependencies for the DAG compiler
- More readable and maintainable
4. Declare Types for Complex Records
Define record types at the top of your pipeline for complex data structures.
# Good: explicit type declarations
type UserProfile = {
id: String,
name: String,
email: String,
score: Int
}
type Enrichment = {
timestamp: String,
source: String,
campaign: String
}
in user: UserProfile
in enrichment: Enrichment
enrichedUser = user + enrichment
out enrichedUser
# Acceptable for simple cases
in simpleRecord: { id: String, value: Int }
5. Separate Data Flow from Control Flow
Keep data transformations and conditional logic separate for clarity.
# Good: separation of concerns
in rawScore: Int
in tier: String
# Data transformations
normalizedScore = rawScore / 100
cappedScore = if (normalizedScore > 1.0) 1.0 else normalizedScore
# Control flow decisions
isPremium = tier == "premium"
isHighScore = cappedScore > 0.8
# Conditional outputs
bonus = "Premium bonus applied" when isPremium and isHighScore
finalScore = cappedScore + 0.1 when bonus != None else cappedScore
out finalScore
out bonus
Variable Naming Conventions
Naming Patterns by Type
| Data Type | Naming Pattern | Examples |
|---|---|---|
| Boolean | is, has, should, can prefix | isValid, hasError, shouldRetry, canProcess |
| Count/Length | count, length, size, num suffix | wordCount, listLength, batchSize, numItems |
| Transformed Data | Descriptive adjective + noun | cleanedText, normalizedScore, enrichedUser |
| Aggregates | Operation + noun | totalRevenue, averageScore, maxValue, minThreshold |
| Collections | Plural nouns | users, scores, items, results |
| Records | Singular nouns | user, profile, request, response |
Good Naming Examples
# Boolean variables
isAuthenticated = ValidateToken(token)
hasPermission = CheckPermission(userId, resource)
shouldCache = responseTime > 100
canRetry = attemptCount < 3
# Counts and sizes
wordCount = WordCount(text)
itemCount = ListLength(items)
batchSize = 100
# Transformed data
cleanedText = Trim(rawText)
normalizedEmail = Lowercase(email)
enrichedProfile = profile + metadata
scaledScores = map(scores, (x) => multiply(x, 2))
# Aggregates
totalRevenue = SumList(revenues)
averageScore = Average(scores)
maxValue = Max(values)
highestRated = First(SortDescending(items))
# Collections vs. Records
users = FetchUsers(query) # Collection (plural)
user = SelectFirst(users) # Single record (singular)
activeUsers = filter(users, (u) => u.active) # Collection
Avoid These Naming Patterns
# ❌ Avoid: Generic names
result1 = Process(data)
result2 = Transform(result1)
temp = Filter(result2)
# ✅ Better: Descriptive names
validatedData = Process(data)
normalizedData = Transform(validatedData)
filteredData = Filter(normalizedData)
# ❌ Avoid: Abbreviations
usrProf = FetchUser(id)
procRes = ProcessRequest(req)
# ✅ Better: Full words
userProfile = FetchUser(id)
processedResponse = ProcessRequest(request)
# ❌ Avoid: Type information in names
stringName = GetName(user)
intScore = GetScore(user)
# ✅ Better: Semantic meaning
userName = GetName(user)
userScore = GetScore(user)
Performance Optimization Strategies
1. Filter Early, Project Late
Reduce dataset size as early as possible, but keep fields you might need.
# Optimized: filter early
in items: List<Item>
in threshold: Int
# Step 1: Filter to reduce dataset
aboveThreshold = filter(items, (x) => gt(x.value, threshold))
# Step 2: Expensive operations on smaller dataset
enriched = EnrichItems(aboveThreshold)
scored = ScoreItems(enriched)
# Step 3: Project at the end (once you know what you need)
final = scored[id, score, category]
out final
Why: Processing fewer items through expensive operations is faster.
2. Leverage Automatic Parallelism
Structure independent operations to run in parallel.
# Optimized: parallel independent operations
in userId: String
# These run in parallel automatically
profile = FetchProfile(userId)
orders = FetchOrders(userId)
preferences = FetchPreferences(userId)
recommendations = ComputeRecommendations(userId)
# Fan-in happens after all parallel work completes
summary = profile + orders + preferences
out summary
out recommendations
Why: Total latency = max(operations), not sum(operations).
3. Cache Expensive Operations
Add caching to reduce redundant computation.
# Optimized: strategic caching
in query: String
# Cache expensive lookups
embeddings = ComputeEmbeddings(query) with cache: 1h
similarDocs = FindSimilar(embeddings) with cache: 30min
# Don't cache fast operations
formatted = FormatResults(similarDocs)
out formatted
When to cache:
- Expensive computations (ML models, embeddings)
- External API calls
- Operations with deterministic results
When NOT to cache:
- Fast operations (string manipulation)
- Non-deterministic operations
- Real-time data that changes frequently
4. Use Resilience Patterns Strategically
Add retries and timeouts where they matter most.
# Optimized: selective resilience
in requestId: String
# Critical path: aggressive resilience
criticalData = FetchCritical(requestId) with
retry: 3,
timeout: 1s,
fallback: defaultCritical
# Non-critical path: lighter resilience
enrichmentData = FetchEnrichment(requestId) with
timeout: 500ms,
on_error: log
# No resilience needed for fast, reliable operations
formatted = FormatResponse(criticalData)
out formatted
out enrichmentData
5. Batch Operations When Possible
Process collections in batches instead of individual items.
# Optimized: batch processing
type Item = { id: String, value: Int }
in items: Candidates<Item>
in context: Context
# Single batch operation (efficient)
enriched = items + context
# Single batch projection
final = enriched[id, value, userId]
out final
# ❌ Avoid: processing items individually in loops
# Use Candidates<T> and let the runtime batch automatically
6. Conditional Execution for Expensive Operations
Use guards to avoid unnecessary computation.
# Optimized: conditional expensive operations
in text: String
in mode: String
# Only compute embeddings when needed
needsEmbeddings = mode == "semantic"
embeddings = ComputeExpensiveEmbeddings(text) when needsEmbeddings
# Fallback to cheaper alternative
keywords = ExtractKeywords(text)
# Use embeddings if available, otherwise keywords
searchTerms = embeddings ?? keywords
out searchTerms
Anti-Patterns to Avoid
❌ Anti-Pattern 1: Over-Parallelization
Problem: Creating too many tiny parallel operations that add coordination overhead.
# ❌ Bad: over-parallelization
in text: String
# Each operation is very fast - parallelism adds overhead
char1 = GetChar(text, 0)
char2 = GetChar(text, 1)
char3 = GetChar(text, 2)
char4 = GetChar(text, 3)
combined = char1 + char2 + char3 + char4
Solution: Group related fast operations sequentially.
# ✅ Good: appropriate granularity
in text: String
substring = Substring(text, 0, 4)
out substring
Rule of thumb: Parallelize operations that take >10ms. For faster operations, sequential execution may be more efficient.
❌ Anti-Pattern 2: Unnecessary Dependencies
Problem: Creating artificial dependencies that prevent parallelism.
# ❌ Bad: unnecessary sequential execution
in userId: String
profile = FetchProfile(userId)
_ = LogFetch("profile") # Artificial dependency
orders = FetchOrders(userId)
_ = LogFetch("orders") # Artificial dependency
prefs = FetchPreferences(userId)
Solution: Remove unnecessary dependencies.
# ✅ Good: independent operations
in userId: String
# All three fetch in parallel
profile = FetchProfile(userId)
orders = FetchOrders(userId)
prefs = FetchPreferences(userId)
# Log after all complete (if needed)
_ = LogCompletion("all-fetched")
❌ Anti-Pattern 3: Premature Optimization
Problem: Adding complexity for unproven performance gains.
# ❌ Bad: premature caching and complexity
in query: String
# Caching everything "just in case"
tokens = Tokenize(query) with cache: 1h
normalized = Normalize(tokens) with cache: 1h
cleaned = Clean(normalized) with cache: 1h
formatted = Format(cleaned) with cache: 1h
out formatted
Solution: Start simple, measure, then optimize.
# ✅ Good: simple first
in query: String
tokens = Tokenize(query)
normalized = Normalize(tokens)
cleaned = Clean(normalized)
formatted = Format(cleaned)
out formatted
# Add caching later if profiling shows bottlenecks
Best practice: Write clear code first. Profile to find actual bottlenecks. Optimize only where measurements show benefit.
❌ Anti-Pattern 4: Deep Expression Nesting
Problem: Deeply nested expressions are hard to read and debug.
# ❌ Bad: deep nesting
out FormatOutput(
Aggregate(
Transform(
Filter(
Normalize(
Clean(rawData)
)
)
)
)
)
Solution: Extract intermediate values with descriptive names.
# ✅ Good: clear intermediate steps
cleaned = Clean(rawData)
normalized = Normalize(cleaned)
filtered = Filter(normalized)
transformed = Transform(filtered)
aggregated = Aggregate(transformed)
output = FormatOutput(aggregated)
out output
❌ Anti-Pattern 5: Magic Numbers and Strings
Problem: Hardcoded values make pipelines inflexible and hard to understand.
# ❌ Bad: magic numbers
in score: Int
tier = branch {
score >= 90 -> "platinum",
score >= 70 -> "gold",
score >= 50 -> "silver",
otherwise -> "bronze"
}
bonus = 100 when score > 85
Solution: Use named inputs for thresholds and configuration.
# ✅ Good: parameterized
@example(90)
in platinumThreshold: Int
@example(70)
in goldThreshold: Int
@example(50)
in silverThreshold: Int
@example(85)
in bonusThreshold: Int
@example(100)
in bonusAmount: Int
in score: Int
tier = branch {
score >= platinumThreshold -> "platinum",
score >= goldThreshold -> "gold",
score >= silverThreshold -> "silver",
otherwise -> "bronze"
}
bonus = bonusAmount when score > bonusThreshold
❌ Anti-Pattern 6: Ignoring Type Safety
Problem: Using stringly-typed data instead of structured records.
# ❌ Bad: stringly-typed
in userData: String # JSON string "{"id": "123", "name": "Alice"}"
# Parsing and extracting manually
userId = ExtractUserId(userData)
userName = ExtractUserName(userData)
Solution: Use structured record types.
# ✅ Good: type-safe
type User = {
id: String,
name: String,
email: String
}
in user: User
# Direct field access (compile-time validated)
userId = user.id
userName = user.name
out userId
out userName
❌ Anti-Pattern 7: Duplicate Logic
Problem: Repeating the same computation multiple times.
# ❌ Bad: duplicate computation
in text: String
wordCount1 = WordCount(Lowercase(Trim(text)))
wordCount2 = WordCount(Lowercase(Trim(text)))
isLong = WordCount(Lowercase(Trim(text))) > 100
Solution: Compute once, reuse the result.
# ✅ Good: compute once
in text: String
trimmed = Trim(text)
normalized = Lowercase(trimmed)
wordCount = WordCount(normalized)
isLong = wordCount > 100
out wordCount
out isLong
❌ Anti-Pattern 8: Overly Complex Conditionals
Problem: Nested conditionals become hard to reason about.
# ❌ Bad: complex nested conditionals
result = if (a) (if (b) (if (c) x else y) else z) else w
Solution: Use branch expressions or extract intermediate booleans.
# ✅ Good: branch expression
result = branch {
a and b and c -> x,
a and b -> y,
a -> z,
otherwise -> w
}
# ✅ Also good: named intermediates
isCase1 = a and b and c
isCase2 = a and b
isCase3 = a
result = branch {
isCase1 -> x,
isCase2 -> y,
isCase3 -> z,
otherwise -> w
}
Visual DAG Examples
Example 1: E-Commerce Order Processing
# E-commerce order processing pipeline
type Order = { orderId: String, userId: String, total: Int }
type User = { userId: String, tier: String }
in order: Order
in user: User
# Parallel validation and enrichment
orderValid = ValidateOrder(order)
userValid = ValidateUser(user)
inventory = CheckInventory(order.orderId)
# Sequential processing after validation
allValid = orderValid and userValid
processOrder = ProcessPayment(order) when allValid
# Parallel post-processing
invoice = GenerateInvoice(order) when processOrder != None
notification = SendNotification(user) when processOrder != None
analytics = LogAnalytics(order, user)
out processOrder
out invoice
out notification
DAG Visualization:
order user
↓ ↓
├─────┼─────┐
↓ ↓ ↓
ValidateOrder ValidateUser CheckInventory
↓ ↓ ↓
orderValid userValid inventory
└──┬──┘
↓
allValid (and)
↓
ProcessPayment (when allValid) → processOrder
↓
┌──┼──┐
↓ ↓ ↓
GenerateInvoice SendNotification LogAnalytics
↓ ↓ ↓
invoice notification analytics
Example 2: Content Recommendation Pipeline
# Content recommendation pipeline
in userId: String
in contextTags: List<String>
# Fan-out: parallel data gathering
userHistory = FetchHistory(userId) with cache: 5min
userPrefs = FetchPreferences(userId) with cache: 1h
trendingContent = FetchTrending() with cache: 10min
# Parallel analysis
historyEmbeddings = ComputeEmbeddings(userHistory)
contextEmbeddings = ComputeEmbeddings(contextTags)
# Similarity search (depends on embeddings)
historySimilar = FindSimilar(historyEmbeddings)
contextSimilar = FindSimilar(contextEmbeddings)
# Merge and rank
allCandidates = historySimilar + contextSimilar
rankedResults = RankByRelevance(allCandidates, userPrefs)
# Filter and format
topResults = TakeTop(rankedResults, 10)
formatted = FormatRecommendations(topResults)
out formatted
DAG Visualization:
userId contextTags
↓ ↓
├───┬─────┼───┐
↓ ↓ ↓ ↓
FetchHistory FetchPreferences FetchTrending ComputeEmbeddings
↓ ↓ ↓ ↓ (contextTags)
│ │ │ ComputeEmbeddings ↓
│ │ │ (userHistory) contextEmbeddings
│ │ │ ↓ ↓
│ │ │ historyEmbeddings FindSimilar
│ │ │ ↓ ↓
│ │ │ FindSimilar contextSimilar
│ │ │ ↓ │
│ │ │ historySimilar │
│ │ │ └───────┬───────────┘
│ │ │ ↓
│ │ │ allCandidates (+)
│ │ └─────────────────┼───────────┐
│ │ ↓ │
│ └──────────────────RankByRelevance │
│ ↓ │
│ rankedResults │
│ ↓ │
│ TakeTop │
│ ↓ │
│ topResults │
│ ↓ │
└────────────────────FormatRecommendations
↓
formatted
Summary
Key Takeaways
-
Automatic Parallelism: Focus on expressing logic clearly. The DAG compiler optimizes execution.
-
Pattern Selection:
- Use sequential for data transformations where order matters
- Use parallel for independent operations
- Use fan-out/fan-in for aggregating data from multiple sources
- Use guards for optional values with fallbacks
- Use branches for exhaustive classification
-
Organization:
- Group related operations
- Use descriptive variable names
- Extract intermediate values
- Separate data flow from control flow
-
Performance:
- Filter early, project late
- Leverage automatic parallelism
- Cache strategically
- Use resilience where it matters
-
Avoid Anti-Patterns:
- Don't over-parallelize tiny operations
- Don't create artificial dependencies
- Don't prematurely optimize
- Don't nest expressions deeply
- Don't hardcode magic numbers
- Don't ignore type safety
Further Reading
- Fan-Out/Fan-In Cookbook — Practical fan-out patterns
- Conditional Branching Cookbook — Guards, coalesce, and branches
- Data Pipeline Cookbook — Multi-step data processing
- Orchestration Algebra — Boolean-based control flow
- Lambdas and HOF Cookbook — List processing patterns
Remember: Well-structured pipelines are clear, maintainable, and performant. Let the DAG compiler handle optimization while you focus on expressing your domain logic clearly.