Deterministic Replay
Romancy uses a deterministic replay mechanism to ensure workflows never lose progress. This document explains how workflows are resumed after interruption and how state is restored.
Overview
Romancy’s replay mechanism has three key characteristics:
- Completed activities are skipped: Already-executed activities return cached results from history
- Workflow code runs fully: Control flow and calculations between activities execute every time
- State restoration from history: Workflow state is restored from the persisted execution history
How Replay Works
Activity Execution Flow
When an activity is called during replay:
- Activity ID Resolution: Auto-generated (
function_name:counter) or explicitly provided - Cache Check: If replaying, check if result is cached for this
activity_id - Return Cached: If found, return cached result without executing the function
- Execute: If not cached, run the function and record the result
- Error Handling: Failed activities are recorded with full error details
Activity ID Patterns
Activities are identified by unique IDs in the format function_name:counter.
Sequential Execution (Auto-generated IDs):
// First call: auto-generates "reserve_inventory:1"
inventory, err := reserveInventory.Execute(ctx, orderID)
if err != nil {
return nil, err
}
// Second call: auto-generates "reserve_inventory:2"
backupInventory, err := reserveInventory.Execute(ctx, backupOrderID)
if err != nil {
return nil, err
}Conditional Execution (Auto-generated IDs):
// Execution order is deterministic, so auto-generated IDs work fine
if requiresApproval {
result, err = approveOrder.Execute(ctx, orderID) // Auto: "approve_order:1"
} else {
result, err = autoApprove.Execute(ctx, orderID) // Auto: "auto_approve:1"
}Loop Execution (Auto-generated IDs):
// Execution order is deterministic (same order every replay)
for _, item := range items {
_, err := processItem.Execute(ctx, item) // Auto: "process_item:1", "process_item:2", ...
if err != nil {
return nil, err
}
}Concurrent Execution (Manual IDs Required):
// Execution order is non-deterministic, so manual IDs are required
import "golang.org/x/sync/errgroup"
type ProcessResult struct {
Status string `json:"status"`
Data string `json:"data"`
}
var eg errgroup.Group
var results [3]ProcessResult
eg.Go(func() error {
r, err := processA.Execute(ctx, data, romancy.WithActivityID("process_a:1"))
results[0] = r
return err
})
eg.Go(func() error {
r, err := processB.Execute(ctx, data, romancy.WithActivityID("process_b:1"))
results[1] = r
return err
})
eg.Go(func() error {
r, err := processC.Execute(ctx, data, romancy.WithActivityID("process_c:1"))
results[2] = r
return err
})
if err := eg.Wait(); err != nil {
return ConcurrentResult{}, err
}How Replay Works Internally
When an activity is called:
- Resolve Activity ID: Auto-generate or use explicit
activity_idparameter - Check Replay Mode: If
ctx.IsReplaying()is true, check cache - Cache Lookup: Look for cached result using
activity_idas key - Return or Execute: Return cached result if found, otherwise execute function
- Record Result: Save result to database with
activity_idfor future replay
Example
package main
import (
"context"
"github.com/i2y/romancy"
)
type ReservationResult struct {
ReservationID string `json:"reservation_id"`
Status string `json:"status"`
}
type PaymentResult struct {
TransactionID string `json:"transaction_id"`
Status string `json:"status"`
}
type ShippingResult struct {
TrackingNumber string `json:"tracking_number"`
}
type OrderWorkflowResult struct {
Status string `json:"status"`
}
var reserveInventory = romancy.DefineActivity("reserve_inventory",
func(ctx context.Context, orderID string) (ReservationResult, error) {
// Business logic here
return ReservationResult{ReservationID: "R123", Status: "reserved"}, nil
},
)
var processPayment = romancy.DefineActivity("process_payment",
func(ctx context.Context, orderID string) (PaymentResult, error) {
// Business logic here
return PaymentResult{TransactionID: "T456", Status: "completed"}, nil
},
)
var arrangeShipping = romancy.DefineActivity("arrange_shipping",
func(ctx context.Context, orderID string) (ShippingResult, error) {
// Business logic here
return ShippingResult{TrackingNumber: "TRACK789"}, nil
},
)
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (OrderWorkflowResult, error) {
// Activity IDs are auto-generated for sequential calls
_, err := reserveInventory.Execute(ctx, orderID)
if err != nil {
return OrderWorkflowResult{}, err
}
_, err = processPayment.Execute(ctx, orderID)
if err != nil {
return OrderWorkflowResult{}, err
}
_, err = arrangeShipping.Execute(ctx, orderID)
if err != nil {
return OrderWorkflowResult{}, err
}
return OrderWorkflowResult{Status: "completed"}, nil
},
)If workflow crashed after processing payment, during replay:
reserveInventory: Skipped (cached result{"reservation_id": "R123", ...}returned)processPayment: Skipped (cached result{"transaction_id": "T456", ...}returned)arrangeShipping: Executed (no cache available, runs normally)
What Gets Replayed
✅ Always Executed (Every Replay)
- Variable calculations and assignments
- Control flow (
if,for,switchstatements) - Function calls (non-activity)
- Local variable operations
- Workflow function code from start to finish
❌ Never Re-executed (Cached)
- Completed activity business logic
Example
type BalanceCheckResult struct {
Sufficient bool `json:"sufficient"`
Balance float64 `json:"balance"`
}
type TransactionResult struct {
Amount float64 `json:"amount"`
Fee float64 `json:"fee"`
}
type ComplexWorkflowResult struct {
Status string `json:"status"`
}
var complexWorkflow = romancy.DefineWorkflow("complex_workflow",
func(ctx *romancy.WorkflowContext, amount float64) (ComplexWorkflowResult, error) {
// This code executes every time (including replay)
tax := amount * 0.1
total := amount + tax
fmt.Printf("Total calculated: %.2f\n", total) // Prints on every replay!
// Activity is skipped during replay (cached)
result1, err := checkBalance.Execute(ctx, total)
if err != nil {
return ComplexWorkflowResult{}, err
}
// This if statement is evaluated every time
if result1.Sufficient {
// This activity is also skipped during replay
result2, err := processTransaction.Execute(ctx, total)
if err != nil {
return ComplexWorkflowResult{}, err
}
// This calculation executes every time
finalAmount := result2.Amount - result2.Fee
// This activity is also skipped during replay
_, err = sendReceipt.Execute(ctx, finalAmount)
if err != nil {
return ComplexWorkflowResult{}, err
}
} else {
// This branch is also evaluated every time
_, err := sendRejection.Execute(ctx, "Insufficient balance")
if err != nil {
return ComplexWorkflowResult{}, err
}
}
return ComplexWorkflowResult{Status: "completed"}, nil
},
)During replay:
tax,totalcalculations execute every timefmt.Printf()executes every time (may appear multiple times in logs)checkBalanceskipped,result1from cacheif result1["sufficient"]evaluated every timeprocessTransactionskipped,result2from cachefinalAmountcalculation executes every timesendReceiptskipped
History and Caching
Data Flow
First execution:
Activity executes → Result saved to DB → Available for replay
Replay:
Load history from DB → Populate cache → Return cached resultsWhat Gets Stored
Romancy persists all activity results to the workflow_history table:
| instance_id | activity_id | event_type | event_data |
|---|---|---|---|
| order-abc123 | reserve_inventory:1 | ActivityCompleted | {"activity_name": "reserve_inventory", "result": {"reservation_id": "R123"}, "input": {...}} |
| order-abc123 | process_payment:1 | ActivityCompleted | {"activity_name": "process_payment", "result": {"transaction_id": "T456"}, "input": {...}} |
| order-abc123 | wait_event_payment.completed:1 | EventReceived | {"event_data": {...}} |
Event Types:
- ActivityCompleted: Successful activity execution
- ActivityFailed: Activity raised an error (includes error type and message)
- EventReceived: Event received via
WaitEvent() - TimerExpired: Timer expired via
Sleep()
How Cache Works
On replay, Romancy:
- Loads all history from the database for this workflow instance
- Populates an in-memory cache keyed by
activity_id - Returns cached results without re-executing activities
Example cache after loading history:
map[string]any{
"reserve_inventory:1": map[string]any{"reservation_id": "R123", "status": "reserved"},
"process_payment:1": map[string]any{"transaction_id": "T456", "status": "completed"},
}This ensures workflows resume exactly where they left off, even after crashes.
ReceivedEvent Reconstruction
Events received via WaitEvent() are automatically reconstructed from stored data, preserving CloudEvents metadata (type, source, time, etc.).
Determinism Guarantees
✅ Best Practices
1. Hide non-deterministic operations in activities:
var getCurrentTime = romancy.DefineActivity("get_current_time",
func(ctx context.Context) (string, error) {
return time.Now().Format(time.RFC3339), nil
},
)
type TimestampResult struct {
Timestamp string `json:"timestamp"`
}
var myWorkflow = romancy.DefineWorkflow("my_workflow",
func(ctx *romancy.WorkflowContext) (TimestampResult, error) {
// Replay will use the same timestamp
timestamp, err := getCurrentTime.Execute(ctx)
if err != nil {
return TimestampResult{}, err
}
return TimestampResult{Timestamp: timestamp}, nil
},
)2. Random values should be activities:
var generateID = romancy.DefineActivity("generate_id",
func(ctx context.Context) (string, error) {
return uuid.New().String(), nil
},
)3. External API calls should be activities (recommended):
type APIData struct {
ID string `json:"id"`
Value string `json:"value"`
Status string `json:"status"`
}
var callExternalAPI = romancy.DefineActivity("call_external_api",
func(ctx context.Context) (APIData, error) {
resp, err := http.Get("https://api.example.com/data")
if err != nil {
return APIData{}, err
}
defer resp.Body.Close()
var data APIData
json.NewDecoder(resp.Body).Decode(&data)
return data, nil
},
)
var myWorkflow = romancy.DefineWorkflow("my_workflow",
func(ctx *romancy.WorkflowContext) (APIData, error) {
// Benefits of making it an activity:
// - Not re-executed on replay (definitely from cache)
// - Easy to test (can be mocked)
// - Recorded in history
// - Better performance (network cost reduced)
data, err := callExternalAPI.Execute(ctx)
if err != nil {
return APIData{}, err
}
return data, nil
},
)❌ Anti-Patterns
type SomeActivityResult struct {
Status string `json:"status"`
}
var badWorkflow = romancy.DefineWorkflow("bad_workflow",
func(ctx *romancy.WorkflowContext) (SomeActivityResult, error) {
// ❌ Direct time access in workflow (different on replay)
timestamp := time.Now()
// First run: 2025-01-01 10:00:00
// Replay: 2025-01-01 10:05:00 ← Different!
// ❌ Random value generation in workflow (different on replay)
requestID := uuid.New().String()
// First run: "abc-123"
// Replay: "def-456" ← Different!
// ❌ File write in workflow (duplicated on replay)
f, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_WRONLY, 0644)
f.WriteString(fmt.Sprintf("Processing at %s\n", timestamp))
f.Close()
// Logs appended on every replay
result, err := someActivity.Execute(ctx, timestamp.String(), requestID)
return result, err
},
)Rule of thumb: When in doubt, make it an activity. There’s minimal downside and significant benefits.
When Replay Happens
1. Event Waiting Resume (WaitEvent())
The most common case is when a workflow resumes after waiting for an external event.
type StartPaymentResult struct {
PaymentID string `json:"payment_id"`
}
type CompleteOrderResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
var paymentWorkflow = romancy.DefineWorkflow("payment_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (CompleteOrderResult, error) {
// Step 1: Start payment
_, err := startPayment.Execute(ctx, orderID)
if err != nil {
return CompleteOrderResult{}, err
}
// Step 2: Wait for payment completion event
// Workflow pauses here (status="waiting_for_event")
event, err := romancy.WaitEvent(ctx,
romancy.WithEventType("payment.completed"),
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
return CompleteOrderResult{}, err
}
// After event received, resume from here (replay happens)
// Step 3: Complete order
result, err := completeOrder.Execute(ctx, orderID, event)
if err != nil {
return CompleteOrderResult{}, err
}
return result, nil
},
)Replay behavior:
resume_workflow()creates context withctx.IsReplaying()=trueloadHistory()loads execution history- Workflow function runs from start
startPayment- Skipped (cached result)WaitEvent()- Skipped (cached event data)completeOrder- Executed (new activity)
2. Explicit Resume Call
Developers can manually resume workflows:
// Admin API endpoint
func ResumeWorkflowHandler(w http.ResponseWriter, r *http.Request) {
instanceID := chi.URLParam(r, "instance_id")
ctx := context.Background()
instance, err := app.Storage().GetInstance(ctx, instanceID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
workflowName := instance["workflow_name"].(string)
// Get corresponding workflow
workflow := romancy.GetWorkflow(workflowName)
// Start replay
if err := workflow.Resume(ctx, app, instanceID); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]string{
"status": "resumed",
"instance_id": instanceID,
})
}3. Crash Recovery (Automatic)
Romancy automatically recovers from crashes in two stages:
3-1. Stale Lock Cleanup (Implemented)
When a worker process crashes, its locks become “stale.” Romancy automatically cleans these up:
func cleanupStaleLocksPeriodically(storage StorageProtocol, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
// Clean up stale locks (uses lock_expires_at column)
workflowsToResume, err := storage.CleanupStaleLocks(context.Background())
if err != nil {
log.Printf("Failed to cleanup stale locks: %v", err)
continue
}
if len(workflowsToResume) > 0 {
log.Printf("Cleaned up %d stale locks", len(workflowsToResume))
}
}
}This background task starts automatically when romancy.App launches.
How it works:
- Every 60 seconds, check for stale locks
- Expired locks are detected (based on
lock_expires_atcolumn set at lock acquisition) - Release those locks (
locked_by=NULL) - Return list of workflows that need to be resumed
Return value structure:
[]map[string]string{
{
"instance_id": "...",
"workflow_name": "...",
"source_hash": "...", // Hash of workflow definition
"status": "...", // "running" or "compensating"
},
// ...
}The status field indicates whether the workflow was running normally ("running") or executing compensations ("compensating") when it crashed.
3-2. Automatic Workflow Resume (Implemented)
After cleaning stale locks, Romancy automatically resumes workflows with status="running" or status="compensating":
func autoResumeStaleworkflowsPeriodically(storage StorageProtocol, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
// Clean up stale locks and get workflows to resume
workflowsToResume, err := storage.CleanupStaleLocks(ctx)
if err != nil {
log.Printf("Failed to cleanup stale locks: %v", err)
continue
}
if len(workflowsToResume) > 0 {
// Auto-resume workflows
for _, wf := range workflowsToResume {
instanceID := wf["instance_id"]
workflowName := wf["workflow_name"]
log.Printf("Auto-resuming: %s (%s)", workflowName, instanceID)
workflow := romancy.GetWorkflow(workflowName)
if err := workflow.Resume(ctx, app, instanceID); err != nil {
log.Printf("Failed to resume %s: %v", instanceID, err)
}
}
}
}
}Special handling for different workflow states:
Running workflows (
status="running"):- Resume normally via
workflow.Resume() - Full workflow function execution with replay
- Resume normally via
Compensating workflows (
status="compensating"):- Resume via
workflow.ResumeCompensating() - Only re-execute incomplete compensations (not the workflow function)
- Ensures compensation transactions complete even after crashes
- Resume via
Source hash verification (Safety mechanism):
Before auto-resuming, Romancy verifies that the workflow definition hasn’t changed:
// Check if workflow definition matches current registry
currentHash := workflow.SourceHash()
storedHash := wf["source_hash"]
if currentHash != storedHash {
// Workflow code has changed - skip auto-resume
log.Printf("Source hash mismatch for %s", workflowName)
continue
}This prevents incompatible code from executing and ensures crash recovery is safe.
Why this works:
When a worker crashes, workflows with status="running" always hold a stale lock:
| Workflow Status | Lock Held | On Crash | Auto-Resume Strategy |
|---|---|---|---|
status="running" | YES (inside workflow_lock) | Becomes stale | ✅ Normal resume |
status="compensating" | YES (inside compensation execution) | Becomes stale | ✅ Compensation resume |
status="waiting_for_event" | NO (after lock released) | No stale lock | ❌ Event-driven resume |
status="waiting_for_timer" | NO (after lock released) | No stale lock | ❌ Timer-driven resume |
status="completed" | NO | No stale lock | N/A |
status="failed" | NO | No stale lock | N/A |
status="cancelled" | NO | No stale lock | N/A |
Therefore, cleaning stale locks and resuming status="running" and status="compensating" workflows ensures no resume leakage.
4. Deployment & Scale-Out
Romancy supports distributed execution, so workflows continue during deployment:
Scenario:
- Worker A executing a workflow
- Worker B newly deployed
- Worker A shutdown
- Waiting workflows are taken over by Worker B (resume via replay)
Database-based exclusive control guarantee:
Romancy’s database-based exclusive control prevents multiple workers from executing the same workflow instance simultaneously:
err := app.WithWorkflowLock(ctx, instanceID, workerID, func() error {
// Only execute while lock held
wfCtx := romancy.NewWorkflowContext(instanceID, true, ...)
wfCtx.LoadHistory()
return workflowFunc(wfCtx, inputData)
})Complete Replay Flow
Initial Execution (Completed Steps 1-2, Crashed at Step 3)
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (OrderWorkflowResult, error) {
// Activity 1 (auto-generated ID: "reserve_inventory:1")
inventory, err := reserveInventory.Execute(ctx, orderID)
// → DB saved: activity_id="reserve_inventory:1", result={"reservation_id": "R123"}
if err != nil {
return OrderWorkflowResult{}, err
}
// Activity 2 (auto-generated ID: "process_payment:1")
payment, err := processPayment.Execute(ctx, orderID)
// → DB saved: activity_id="process_payment:1", result={"transaction_id": "T456"}
if err != nil {
return OrderWorkflowResult{}, err
}
// Activity 3: Error occurs (e.g., network error)
shipping, err := arrangeShipping.Execute(ctx, orderID)
// → Error, workflow interrupted
if err != nil {
return OrderWorkflowResult{}, err
}
return OrderWorkflowResult{Status: "completed"}, nil
},
)DB State:
workflow_instances.status = "running"workflow_instances.current_activity_id = "process_payment:1"workflow_historyhas 2 records
Replay Execution (Resume)
// 1. workflow.Resume() called
err := orderWorkflow.Resume(ctx, app, instanceID)
// 2. Create WorkflowContext (isReplaying=true)
wfCtx := romancy.NewWorkflowContext(
instanceID,
true, // Replay mode
...
)
// 3. Load history
wfCtx.LoadHistory()
// → historyCache = {"reserve_inventory:1": {...}, "process_payment:1": {...}}
// 4. Execute workflow function from start
result, err := orderWorkflow.Execute(wfCtx, orderID)
// 5. Activity: reserve_inventory:1
// - ctx.IsReplaying() == true
// - Cache has activity_id="reserve_inventory:1"
// - Don't execute function, return {"reservation_id": "R123"} from cache
// 6. Activity: process_payment:1
// - ctx.IsReplaying() == true
// - Cache has activity_id="process_payment:1"
// - Don't execute function, return {"transaction_id": "T456"} from cache
// 7. Activity: arrange_shipping:1
// - ctx.IsReplaying() == true
// - No cache for activity_id="arrange_shipping:1"
// - Execute function (new processing)
// - Save result to DB on success
// 8. Workflow complete
wfCtx.UpdateStatus("completed", result)Safety Mechanisms
Romancy includes several safety mechanisms to ensure reliable execution:
Source Hash Verification
Before auto-resuming crashed workflows, Romancy verifies workflow definition hasn’t changed:
- Each workflow has a source code hash (
source_hash) - Stored in database when workflow starts
- Compared with current registry during auto-resume
- Incompatible code is skipped (prevents unsafe execution)
This prevents:
- Resuming workflows with outdated logic
- Schema mismatches after deployment
- Data corruption from incompatible code changes
Exclusive Control Guarantees
Romancy’s database-based exclusive control prevents concurrent execution:
err := app.WithWorkflowLock(ctx, instanceID, workerID,
romancy.WithLockTimeout(5*time.Minute),
func() error {
// Only one worker can hold this lock
// Other workers wait or skip
return nil
},
)Features:
- 5-minute timeout by default (prevents indefinite locks)
- Worker ID tracking (know which worker holds the lock)
- Stale lock cleanup (automatic recovery after crashes)
Transactional History Recording
All history recording is transactional:
- Activity completion + history save in single transaction
- Rollback on failure (ensures consistency)
- No orphaned history records
- Deterministic replay guaranteed
Compensating Workflow Recovery
Special handling for workflows that crash during compensation:
status="compensating"detected during cleanup- Only incomplete compensations are re-executed
- Workflow function is NOT re-executed
- Ensures compensation transactions complete even after multiple crashes
Summary
Romancy’s replay mechanism characteristics:
| Item | Behavior |
|---|---|
| Completed activities | Skipped (result from cache) |
| Workflow function code | Runs from start every time |
| Control flow (if/for/switch) | Evaluated every time |
| Variable calculations | Executed every time |
| State restoration | Load history from DB → Populate memory cache |
| Determinism guarantee | Non-deterministic operations hidden in activities |
This mechanism ensures workflows can resume accurately after process crashes, deployments, or scale-outs.
See Also
- Saga Pattern: Automatic compensation on workflow failure
- Event Handling: Wait for external events in workflows